changed addrs.txt to JSON addrs file
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import base64
7 import json
8 import os
9 import random
10 import sys
11 import time
12 import signal
13 import traceback
14 import urlparse
15
16 try:
17     from twisted.internet import iocpreactor
18     iocpreactor.install()
19 except:
20     pass
21 else:
22     print 'Using IOCP reactor!'
23 from twisted.internet import defer, reactor, protocol, task
24 from twisted.web import server
25 from twisted.python import log
26 from nattraverso import portmapper, ipdiscover
27
28 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
29 from bitcoin import worker_interface, height_tracker
30 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
31 from . import p2p, networks, web
32 import p2pool, p2pool.data as p2pool_data
33
34 @deferral.retry('Error getting work from bitcoind:', 3)
35 @defer.inlineCallbacks
36 def getwork(bitcoind):
37     try:
38         work = yield bitcoind.rpc_getmemorypool()
39     except jsonrpc.Error, e:
40         if e.code == -32601: # Method not found
41             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
42             raise deferral.RetrySilentlyException()
43         raise
44     packed_transactions = [x.decode('hex') for x in work['transactions']]
45     defer.returnValue(dict(
46         version=work['version'],
47         previous_block_hash=int(work['previousblockhash'], 16),
48         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
49         merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
50         subsidy=work['coinbasevalue'],
51         time=work['time'],
52         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
53         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
54     ))
55
56 @defer.inlineCallbacks
57 def main(args, net, datadir_path, merged_urls, worker_endpoint):
58     try:
59         print 'p2pool (version %s)' % (p2pool.__version__,)
60         print
61         
62         # connect to bitcoind over JSON-RPC and do initial getmemorypool
63         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
64         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
65         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
66         @deferral.retry('Error while checking Bitcoin connection:', 1)
67         @defer.inlineCallbacks
68         def check():
69             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
70                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
71                 raise deferral.RetrySilentlyException()
72             temp_work = yield getwork(bitcoind)
73             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
74                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
75                 raise deferral.RetrySilentlyException()
76             defer.returnValue(temp_work)
77         temp_work = yield check()
78         print '    ...success!'
79         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
80         print
81         
82         # connect to bitcoind over bitcoin-p2p
83         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
84         factory = bitcoin_p2p.ClientFactory(net.PARENT)
85         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
86         yield factory.getProtocol() # waits until handshake is successful
87         print '    ...success!'
88         print
89         
90         print 'Determining payout address...'
91         if args.pubkey_hash is None:
92             address_path = os.path.join(datadir_path, 'cached_payout_address')
93             
94             if os.path.exists(address_path):
95                 with open(address_path, 'rb') as f:
96                     address = f.read().strip('\r\n')
97                 print '    Loaded cached address: %s...' % (address,)
98             else:
99                 address = None
100             
101             if address is not None:
102                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
103                 if not res['isvalid'] or not res['ismine']:
104                     print '    Cached address is either invalid or not controlled by local bitcoind!'
105                     address = None
106             
107             if address is None:
108                 print '    Getting payout address from bitcoind...'
109                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
110             
111             with open(address_path, 'wb') as f:
112                 f.write(address)
113             
114             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
115         else:
116             my_pubkey_hash = args.pubkey_hash
117         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
118         print
119         
120         my_share_hashes = set()
121         my_doa_share_hashes = set()
122         
123         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
124         shared_share_hashes = set()
125         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
126         known_verified = set()
127         print "Loading shares..."
128         for i, (mode, contents) in enumerate(ss.get_shares()):
129             if mode == 'share':
130                 if contents.hash in tracker.shares:
131                     continue
132                 shared_share_hashes.add(contents.hash)
133                 contents.time_seen = 0
134                 tracker.add(contents)
135                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
136                     print "    %i" % (len(tracker.shares),)
137             elif mode == 'verified_hash':
138                 known_verified.add(contents)
139             else:
140                 raise AssertionError()
141         print "    ...inserting %i verified shares..." % (len(known_verified),)
142         for h in known_verified:
143             if h not in tracker.shares:
144                 ss.forget_verified_share(h)
145                 continue
146             tracker.verified.add(tracker.shares[h])
147         print "    ...done loading %i shares!" % (len(tracker.shares),)
148         print
149         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
150         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
151         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
152         
153         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
154         
155         pre_current_work = variable.Variable(None)
156         pre_merged_work = variable.Variable({})
157         # information affecting work that should trigger a long-polling update
158         current_work = variable.Variable(None)
159         # information affecting work that should not trigger a long-polling update
160         current_work2 = variable.Variable(None)
161         
162         requested = expiring_dict.ExpiringDict(300)
163         
164         print 'Initializing work...'
165         @defer.inlineCallbacks
166         def set_real_work1():
167             work = yield getwork(bitcoind)
168             current_work2.set(dict(
169                 time=work['time'],
170                 transactions=work['transactions'],
171                 merkle_link=work['merkle_link'],
172                 subsidy=work['subsidy'],
173                 clock_offset=time.time() - work['time'],
174                 last_update=time.time(),
175             )) # second set first because everything hooks on the first
176             pre_current_work.set(dict(
177                 version=work['version'],
178                 previous_block=work['previous_block_hash'],
179                 bits=work['bits'],
180                 coinbaseflags=work['coinbaseflags'],
181             ))
182         yield set_real_work1()
183         
184         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
185         
186         def set_real_work2():
187             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
188             
189             t = dict(pre_current_work.value)
190             t['best_share_hash'] = best
191             t['mm_chains'] = pre_merged_work.value
192             current_work.set(t)
193             
194             t = time.time()
195             for peer2, share_hash in desired:
196                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
197                     continue
198                 last_request_time, count = requested.get(share_hash, (None, 0))
199                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
200                     continue
201                 potential_peers = set()
202                 for head in tracker.tails[share_hash]:
203                     potential_peers.update(peer_heads.get(head, set()))
204                 potential_peers = [peer for peer in potential_peers if peer.connected2]
205                 if count == 0 and peer2 is not None and peer2.connected2:
206                     peer = peer2
207                 else:
208                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
209                     if peer is None:
210                         continue
211                 
212                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
213                 peer.send_getshares(
214                     hashes=[share_hash],
215                     parents=2000,
216                     stops=list(set(tracker.heads) | set(
217                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
218                     ))[:100],
219                 )
220                 requested[share_hash] = t, count + 1
221         pre_current_work.changed.watch(lambda _: set_real_work2())
222         pre_merged_work.changed.watch(lambda _: set_real_work2())
223         set_real_work2()
224         print '    ...success!'
225         print
226         
227         
228         @defer.inlineCallbacks
229         def set_merged_work(merged_url, merged_userpass):
230             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
231             while True:
232                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
233                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
234                     hash=int(auxblock['hash'], 16),
235                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
236                     merged_proxy=merged_proxy,
237                 )}))
238                 yield deferral.sleep(1)
239         for merged_url, merged_userpass in merged_urls:
240             set_merged_work(merged_url, merged_userpass)
241         
242         @pre_merged_work.changed.watch
243         def _(new_merged_work):
244             print 'Got new merged mining work!'
245         
246         # setup p2p logic and join p2pool network
247         
248         class Node(p2p.Node):
249             def handle_shares(self, shares, peer):
250                 if len(shares) > 5:
251                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
252                 
253                 new_count = 0
254                 for share in shares:
255                     if share.hash in tracker.shares:
256                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
257                         continue
258                     
259                     new_count += 1
260                     
261                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
262                     
263                     tracker.add(share)
264                 
265                 if shares and peer is not None:
266                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
267                 
268                 if new_count:
269                     set_real_work2()
270                 
271                 if len(shares) > 5:
272                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
273             
274             def handle_share_hashes(self, hashes, peer):
275                 t = time.time()
276                 get_hashes = []
277                 for share_hash in hashes:
278                     if share_hash in tracker.shares:
279                         continue
280                     last_request_time, count = requested.get(share_hash, (None, 0))
281                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
282                         continue
283                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
284                     get_hashes.append(share_hash)
285                     requested[share_hash] = t, count + 1
286                 
287                 if hashes and peer is not None:
288                     peer_heads.setdefault(hashes[0], set()).add(peer)
289                 if get_hashes:
290                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
291             
292             def handle_get_shares(self, hashes, parents, stops, peer):
293                 parents = min(parents, 1000//len(hashes))
294                 stops = set(stops)
295                 shares = []
296                 for share_hash in hashes:
297                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
298                         if share.hash in stops:
299                             break
300                         shares.append(share)
301                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
302                 return shares
303         
304         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
305         def submit_block_p2p(block):
306             if factory.conn.value is None:
307                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
308                 raise deferral.RetrySilentlyException()
309             factory.conn.value.send_block(block=block)
310         
311         @deferral.retry('Error submitting block: (will retry)', 10, 10)
312         @defer.inlineCallbacks
313         def submit_block_rpc(block, ignore_failure):
314             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
315             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
316             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
317                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
318         
319         def submit_block(block, ignore_failure):
320             submit_block_p2p(block)
321             submit_block_rpc(block, ignore_failure)
322         
323         @tracker.verified.added.watch
324         def _(share):
325             if share.pow_hash <= share.header['bits'].target:
326                 submit_block(share.as_block(tracker), ignore_failure=True)
327                 print
328                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
329                 print
330                 if current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]:
331                     broadcast_share(share.hash)
332         
333         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
334         
335         @defer.inlineCallbacks
336         def parse(x):
337             if ':' in x:
338                 ip, port = x.split(':')
339                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
340             else:
341                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
342         
343         addrs = {}
344         if os.path.exists(os.path.join(datadir_path, 'addrs')):
345             try:
346                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
347                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
348             except:
349                 print >>sys.stderr, 'error parsing addrs'
350         elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
351             try:
352                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
353             except:
354                 print >>sys.stderr, "error reading addrs.txt"
355         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
356             try:
357                 addr = yield addr_df
358                 if addr not in addrs:
359                     addrs[addr] = (0, time.time(), time.time())
360             except:
361                 log.err()
362         
363         connect_addrs = set()
364         for addr_df in map(parse, args.p2pool_nodes):
365             try:
366                 connect_addrs.add((yield addr_df))
367             except:
368                 log.err()
369         
370         p2p_node = Node(
371             best_share_hash_func=lambda: current_work.value['best_share_hash'],
372             port=args.p2pool_port,
373             net=net,
374             addr_store=addrs,
375             connect_addrs=connect_addrs,
376             max_incoming_conns=args.p2pool_conns,
377         )
378         p2p_node.start()
379         
380         def save_addrs():
381             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
382                 f.write(json.dumps(p2p_node.addr_store.items()))
383         task.LoopingCall(save_addrs).start(60)
384         
385         def broadcast_share(share_hash):
386             shares = []
387             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
388                 if share.hash in shared_share_hashes:
389                     break
390                 shared_share_hashes.add(share.hash)
391                 shares.append(share)
392             
393             for peer in p2p_node.peers.itervalues():
394                 peer.sendShares([share for share in shares if share.peer is not peer])
395         
396         # send share when the chain changes to their chain
397         current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
398         
399         def save_shares():
400             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
401                 ss.add_share(share)
402                 if share.hash in tracker.verified.shares:
403                     ss.add_verified_hash(share.hash)
404         task.LoopingCall(save_shares).start(60)
405         
406         print '    ...success!'
407         print
408         
409         if args.upnp:
410             @defer.inlineCallbacks
411             def upnp_thread():
412                 while True:
413                     try:
414                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
415                         if is_lan:
416                             pm = yield portmapper.get_port_mapper()
417                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
418                     except defer.TimeoutError:
419                         pass
420                     except:
421                         if p2pool.DEBUG:
422                             log.err(None, 'UPnP error:')
423                     yield deferral.sleep(random.expovariate(1/120))
424             upnp_thread()
425         
426         # start listening for workers with a JSON-RPC server
427         
428         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
429         
430         # setup worker logic
431         
432         removed_unstales_var = variable.Variable((0, 0, 0))
433         removed_doa_unstales_var = variable.Variable(0)
434         @tracker.verified.removed.watch
435         def _(share):
436             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
437                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
438                 removed_unstales_var.set((
439                     removed_unstales_var.value[0] + 1,
440                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
441                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
442                 ))
443             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
444                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
445         
446         def get_stale_counts():
447             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
448             my_shares = len(my_share_hashes)
449             my_doa_shares = len(my_doa_share_hashes)
450             delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
451             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
452             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
453             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
454             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
455             
456             my_shares_not_in_chain = my_shares - my_shares_in_chain
457             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
458             
459             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
460         
461         
462         pseudoshare_received = variable.Event()
463         share_received = variable.Event()
464         local_rate_monitor = math.RateMonitor(10*60)
465         
466         class WorkerBridge(worker_interface.WorkerBridge):
467             def __init__(self):
468                 worker_interface.WorkerBridge.__init__(self)
469                 self.new_work_event = current_work.changed
470                 self.recent_shares_ts_work = []
471             
472             def get_user_details(self, request):
473                 user = request.getUser() if request.getUser() is not None else ''
474                 
475                 desired_pseudoshare_target = None
476                 if '+' in user:
477                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
478                     try:
479                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
480                     except:
481                         pass
482                 
483                 desired_share_target = 2**256 - 1
484                 if '/' in user:
485                     user, min_diff_str = user.rsplit('/', 1)
486                     try:
487                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
488                     except:
489                         pass
490                 
491                 if random.uniform(0, 100) < args.worker_fee:
492                     pubkey_hash = my_pubkey_hash
493                 else:
494                     try:
495                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
496                     except: # XXX blah
497                         pubkey_hash = my_pubkey_hash
498                 
499                 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
500             
501             def preprocess_request(self, request):
502                 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
503                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
504             
505             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
506                 if len(p2p_node.peers) == 0 and net.PERSIST:
507                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
508                 if current_work.value['best_share_hash'] is None and net.PERSIST:
509                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
510                 if time.time() > current_work2.value['last_update'] + 60:
511                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
512                 
513                 if current_work.value['mm_chains']:
514                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
515                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
516                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
517                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
518                         size=size,
519                         nonce=0,
520                     ))
521                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
522                 else:
523                     mm_data = ''
524                     mm_later = []
525                 
526                 if True:
527                     share_info, generate_tx = p2pool_data.Share.generate_transaction(
528                         tracker=tracker,
529                         share_data=dict(
530                             previous_share_hash=current_work.value['best_share_hash'],
531                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
532                             nonce=random.randrange(2**32),
533                             pubkey_hash=pubkey_hash,
534                             subsidy=current_work2.value['subsidy'],
535                             donation=math.perfect_round(65535*args.donation_percentage/100),
536                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
537                                 253 if orphans > orphans_recorded_in_chain else
538                                 254 if doas > doas_recorded_in_chain else
539                                 0
540                             )(*get_stale_counts()),
541                             desired_version=1,
542                         ),
543                         block_target=current_work.value['bits'].target,
544                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
545                         desired_target=desired_share_target,
546                         ref_merkle_link=dict(branch=[], index=0),
547                         net=net,
548                     )
549                 
550                 target = net.PARENT.SANE_MAX_TARGET
551                 if desired_pseudoshare_target is None:
552                     if len(self.recent_shares_ts_work) == 50:
553                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
554                         target = min(target, 2**256//hash_rate)
555                 else:
556                     target = min(target, desired_pseudoshare_target)
557                 target = max(target, share_info['bits'].target)
558                 for aux_work in current_work.value['mm_chains'].itervalues():
559                     target = max(target, aux_work['target'])
560                 
561                 transactions = [generate_tx] + list(current_work2.value['transactions'])
562                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
563                 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
564                 
565                 getwork_time = time.time()
566                 merkle_link = current_work2.value['merkle_link']
567                 
568                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
569                     bitcoin_data.target_to_difficulty(target),
570                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
571                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
572                     len(current_work2.value['transactions']),
573                 )
574                 
575                 ba = bitcoin_getwork.BlockAttempt(
576                     version=current_work.value['version'],
577                     previous_block=current_work.value['previous_block'],
578                     merkle_root=merkle_root,
579                     timestamp=current_work2.value['time'],
580                     bits=current_work.value['bits'],
581                     share_target=target,
582                 )
583                 
584                 received_header_hashes = set()
585                 
586                 def got_response(header, request):
587                     user, _, _, _ = self.get_user_details(request)
588                     assert header['merkle_root'] == merkle_root
589                     
590                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
591                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
592                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
593                     
594                     try:
595                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
596                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
597                             if pow_hash <= header['bits'].target:
598                                 print
599                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
600                                 print
601                     except:
602                         log.err(None, 'Error while processing potential block:')
603                     
604                     for aux_work, index, hashes in mm_later:
605                         try:
606                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
607                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
608                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
609                                     bitcoin_data.aux_pow_type.pack(dict(
610                                         merkle_tx=dict(
611                                             tx=transactions[0],
612                                             block_hash=header_hash,
613                                             merkle_link=merkle_link,
614                                         ),
615                                         merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
616                                         parent_block_header=header,
617                                     )).encode('hex'),
618                                 )
619                                 @df.addCallback
620                                 def _(result):
621                                     if result != (pow_hash <= aux_work['target']):
622                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
623                                     else:
624                                         print 'Merged block submittal result: %s' % (result,)
625                                 @df.addErrback
626                                 def _(err):
627                                     log.err(err, 'Error submitting merged block:')
628                         except:
629                             log.err(None, 'Error while processing merged mining POW:')
630                     
631                     if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
632                         min_header = dict(header);del min_header['merkle_root']
633                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
634                         share = p2pool_data.Share(net, None, dict(
635                             min_header=min_header, share_info=share_info, hash_link=hash_link,
636                             ref_merkle_link=dict(branch=[], index=0),
637                         ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
638                         
639                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
640                             request.getUser(),
641                             p2pool_data.format_hash(share.hash),
642                             p2pool_data.format_hash(share.previous_hash),
643                             time.time() - getwork_time,
644                             ' DEAD ON ARRIVAL' if not on_time else '',
645                         )
646                         my_share_hashes.add(share.hash)
647                         if not on_time:
648                             my_doa_share_hashes.add(share.hash)
649                         
650                         tracker.add(share)
651                         if not p2pool.DEBUG:
652                             tracker.verified.add(share)
653                         set_real_work2()
654                         
655                         try:
656                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
657                                 for peer in p2p_node.peers.itervalues():
658                                     peer.sendShares([share])
659                                 shared_share_hashes.add(share.hash)
660                         except:
661                             log.err(None, 'Error forwarding block solution:')
662                         
663                         share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
664                     
665                     if pow_hash > target:
666                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
667                         print '    Hash:   %56x' % (pow_hash,)
668                         print '    Target: %56x' % (target,)
669                     elif header_hash in received_header_hashes:
670                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
671                     else:
672                         received_header_hashes.add(header_hash)
673                         
674                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
675                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
676                         while len(self.recent_shares_ts_work) > 50:
677                             self.recent_shares_ts_work.pop(0)
678                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
679                     
680                     return on_time
681                 
682                 return ba, got_response
683         
684         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
685         
686         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
687         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
688         
689         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
690         
691         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
692             pass
693         
694         print '    ...success!'
695         print
696         
697         
698         @defer.inlineCallbacks
699         def work_poller():
700             while True:
701                 flag = factory.new_block.get_deferred()
702                 try:
703                     yield set_real_work1()
704                 except:
705                     log.err()
706                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
707         work_poller()
708         
709         
710         # done!
711         print 'Started successfully!'
712         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
713         if args.donation_percentage > 0.51:
714             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
715         elif args.donation_percentage < 0.49:
716             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
717         else:
718             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
719             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
720         print
721         
722         
723         if hasattr(signal, 'SIGALRM'):
724             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
725                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
726             ))
727             signal.siginterrupt(signal.SIGALRM, False)
728             task.LoopingCall(signal.alarm, 30).start(1)
729         
730         if args.irc_announce:
731             from twisted.words.protocols import irc
732             class IRCClient(irc.IRCClient):
733                 nickname = 'p2pool%02i' % (random.randrange(100),)
734                 channel = net.ANNOUNCE_CHANNEL
735                 def lineReceived(self, line):
736                     if p2pool.DEBUG:
737                         print repr(line)
738                     irc.IRCClient.lineReceived(self, line)
739                 def signedOn(self):
740                     irc.IRCClient.signedOn(self)
741                     self.factory.resetDelay()
742                     self.join(self.channel)
743                     @defer.inlineCallbacks
744                     def new_share(share):
745                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
746                             yield deferral.sleep(random.expovariate(1/60))
747                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
748                             if message not in self.recent_messages:
749                                 self.say(self.channel, message)
750                                 self._remember_message(message)
751                     self.watch_id = tracker.verified.added.watch(new_share)
752                     self.recent_messages = []
753                 def _remember_message(self, message):
754                     self.recent_messages.append(message)
755                     while len(self.recent_messages) > 100:
756                         self.recent_messages.pop(0)
757                 def privmsg(self, user, channel, message):
758                     if channel == self.channel:
759                         self._remember_message(message)
760                 def connectionLost(self, reason):
761                     tracker.verified.added.unwatch(self.watch_id)
762                     print 'IRC connection lost:', reason.getErrorMessage()
763             class IRCClientFactory(protocol.ReconnectingClientFactory):
764                 protocol = IRCClient
765             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
766         
767         @defer.inlineCallbacks
768         def status_thread():
769             last_str = None
770             last_time = 0
771             while True:
772                 yield deferral.sleep(3)
773                 try:
774                     if time.time() > current_work2.value['last_update'] + 60:
775                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
776                     
777                     height = tracker.get_height(current_work.value['best_share_hash'])
778                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
779                         height,
780                         len(tracker.verified.shares),
781                         len(tracker.shares),
782                         len(p2p_node.peers),
783                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
784                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
785                     
786                     datums, dt = local_rate_monitor.get_datums_in_last()
787                     my_att_s = sum(datum['work']/dt for datum in datums)
788                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
789                         math.format(int(my_att_s)),
790                         math.format_dt(dt),
791                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
792                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
793                     )
794                     
795                     if height > 2:
796                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
797                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
798                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
799                         
800                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
801                             shares, stale_orphan_shares, stale_doa_shares,
802                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
803                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
804                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
805                         )
806                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
807                             math.format(int(real_att_s)),
808                             100*stale_prop,
809                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
810                         )
811                         
812                         desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
813                         majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
814                         if majority_desired_version not in [0, 1]:
815                             print >>sys.stderr, '#'*40
816                             print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
817                                 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
818                             print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
819                             print >>sys.stderr, '#'*40
820                     
821                     if this_str != last_str or time.time() > last_time + 15:
822                         print this_str
823                         last_str = this_str
824                         last_time = time.time()
825                 except:
826                     log.err()
827         status_thread()
828     except:
829         reactor.stop()
830         log.err(None, 'Fatal error:')
831
832 def run():
833     class FixedArgumentParser(argparse.ArgumentParser):
834         def _read_args_from_files(self, arg_strings):
835             # expand arguments referencing files
836             new_arg_strings = []
837             for arg_string in arg_strings:
838                 
839                 # for regular arguments, just add them back into the list
840                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
841                     new_arg_strings.append(arg_string)
842                 
843                 # replace arguments referencing files with the file content
844                 else:
845                     try:
846                         args_file = open(arg_string[1:])
847                         try:
848                             arg_strings = []
849                             for arg_line in args_file.read().splitlines():
850                                 for arg in self.convert_arg_line_to_args(arg_line):
851                                     arg_strings.append(arg)
852                             arg_strings = self._read_args_from_files(arg_strings)
853                             new_arg_strings.extend(arg_strings)
854                         finally:
855                             args_file.close()
856                     except IOError:
857                         err = sys.exc_info()[1]
858                         self.error(str(err))
859             
860             # return the modified argument list
861             return new_arg_strings
862         
863         def convert_arg_line_to_args(self, arg_line):
864             return [arg for arg in arg_line.split() if arg.strip()]
865     
866     
867     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
868     
869     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
870     parser.add_argument('--version', action='version', version=p2pool.__version__)
871     parser.add_argument('--net',
872         help='use specified network (default: bitcoin)',
873         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
874     parser.add_argument('--testnet',
875         help='''use the network's testnet''',
876         action='store_const', const=True, default=False, dest='testnet')
877     parser.add_argument('--debug',
878         help='enable debugging mode',
879         action='store_const', const=True, default=False, dest='debug')
880     parser.add_argument('-a', '--address',
881         help='generate payouts to this address (default: <address requested from bitcoind>)',
882         type=str, action='store', default=None, dest='address')
883     parser.add_argument('--datadir',
884         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
885         type=str, action='store', default=None, dest='datadir')
886     parser.add_argument('--logfile',
887         help='''log to this file (default: data/<NET>/log)''',
888         type=str, action='store', default=None, dest='logfile')
889     parser.add_argument('--merged',
890         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
891         type=str, action='append', default=[], dest='merged_urls')
892     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
893         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
894         type=float, action='store', default=0.5, dest='donation_percentage')
895     parser.add_argument('--irc-announce',
896         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
897         action='store_true', default=False, dest='irc_announce')
898     
899     p2pool_group = parser.add_argument_group('p2pool interface')
900     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
901         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
902         type=int, action='store', default=None, dest='p2pool_port')
903     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
904         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
905         type=str, action='append', default=[], dest='p2pool_nodes')
906     parser.add_argument('--disable-upnp',
907         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
908         action='store_false', default=True, dest='upnp')
909     p2pool_group.add_argument('--max-conns', metavar='CONNS',
910         help='maximum incoming connections (default: 40)',
911         type=int, action='store', default=40, dest='p2pool_conns')
912     
913     worker_group = parser.add_argument_group('worker interface')
914     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
915         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
916         type=str, action='store', default=None, dest='worker_endpoint')
917     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
918         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
919         type=float, action='store', default=0, dest='worker_fee')
920     
921     bitcoind_group = parser.add_argument_group('bitcoind interface')
922     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
923         help='connect to this address (default: 127.0.0.1)',
924         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
925     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
926         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
927         type=int, action='store', default=None, dest='bitcoind_rpc_port')
928     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
929         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
930         type=int, action='store', default=None, dest='bitcoind_p2p_port')
931     
932     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
933         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
934         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
935     
936     args = parser.parse_args()
937     
938     if args.debug:
939         p2pool.DEBUG = True
940     
941     net_name = args.net_name + ('_testnet' if args.testnet else '')
942     net = networks.nets[net_name]
943     
944     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
945     if not os.path.exists(datadir_path):
946         os.makedirs(datadir_path)
947     
948     if len(args.bitcoind_rpc_userpass) > 2:
949         parser.error('a maximum of two arguments are allowed')
950     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
951     
952     if args.bitcoind_rpc_password is None:
953         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
954             parser.error('This network has no configuration file function. Manually enter your RPC password.')
955         conf_path = net.PARENT.CONF_FILE_FUNC()
956         if not os.path.exists(conf_path):
957             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
958                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
959                 '''\r\n'''
960                 '''server=1\r\n'''
961                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
962         with open(conf_path, 'rb') as f:
963             cp = ConfigParser.RawConfigParser()
964             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
965             for conf_name, var_name, var_type in [
966                 ('rpcuser', 'bitcoind_rpc_username', str),
967                 ('rpcpassword', 'bitcoind_rpc_password', str),
968                 ('rpcport', 'bitcoind_rpc_port', int),
969                 ('port', 'bitcoind_p2p_port', int),
970             ]:
971                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
972                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
973         if args.bitcoind_rpc_password is None:
974             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
975     
976     if args.bitcoind_rpc_username is None:
977         args.bitcoind_rpc_username = ''
978     
979     if args.bitcoind_rpc_port is None:
980         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
981     
982     if args.bitcoind_p2p_port is None:
983         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
984     
985     if args.p2pool_port is None:
986         args.p2pool_port = net.P2P_PORT
987     
988     if args.worker_endpoint is None:
989         worker_endpoint = '', net.WORKER_PORT
990     elif ':' not in args.worker_endpoint:
991         worker_endpoint = '', int(args.worker_endpoint)
992     else:
993         addr, port = args.worker_endpoint.rsplit(':', 1)
994         worker_endpoint = addr, int(port)
995     
996     if args.address is not None:
997         try:
998             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
999         except Exception, e:
1000             parser.error('error parsing address: ' + repr(e))
1001     else:
1002         args.pubkey_hash = None
1003     
1004     def separate_url(url):
1005         s = urlparse.urlsplit(url)
1006         if '@' not in s.netloc:
1007             parser.error('merged url netloc must contain an "@"')
1008         userpass, new_netloc = s.netloc.rsplit('@', 1)
1009         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1010     merged_urls = map(separate_url, args.merged_urls)
1011     
1012     if args.logfile is None:
1013         args.logfile = os.path.join(datadir_path, 'log')
1014     
1015     logfile = logging.LogFile(args.logfile)
1016     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1017     sys.stdout = logging.AbortPipe(pipe)
1018     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1019     if hasattr(signal, "SIGUSR1"):
1020         def sigusr1(signum, frame):
1021             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1022             logfile.reopen()
1023             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1024         signal.signal(signal.SIGUSR1, sigusr1)
1025     task.LoopingCall(logfile.reopen).start(5)
1026     
1027     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
1028     reactor.run()