expanded getblock height_cacher
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import os
7 import random
8 import struct
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
19
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
25
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29     try:
30         work = yield bitcoind.rpc_getmemorypool()
31     except jsonrpc.Error, e:
32         if e.code == -32601: # Method not found
33             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34             raise deferral.RetrySilentlyException()
35         raise
36     packed_transactions = [x.decode('hex') for x in work['transactions']]
37     defer.returnValue(dict(
38         version=work['version'],
39         previous_block_hash=int(work['previousblockhash'], 16),
40         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41         merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42         subsidy=work['coinbasevalue'],
43         time=work['time'],
44         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
46     ))
47
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
50     try:
51         print 'p2pool (version %s)' % (p2pool.__version__,)
52         print
53         
54         # connect to bitcoind over JSON-RPC and do initial getmemorypool
55         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
59         if not good:
60             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
61             return
62         temp_work = yield getwork(bitcoind)
63         print '    ...success!'
64         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
65         print
66         
67         # connect to bitcoind over bitcoin-p2p
68         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69         factory = bitcoin_p2p.ClientFactory(net.PARENT)
70         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71         yield factory.getProtocol() # waits until handshake is successful
72         print '    ...success!'
73         print
74         
75         print 'Determining payout address...'
76         if args.pubkey_hash is None:
77             address_path = os.path.join(datadir_path, 'cached_payout_address')
78             
79             if os.path.exists(address_path):
80                 with open(address_path, 'rb') as f:
81                     address = f.read().strip('\r\n')
82                 print '    Loaded cached address: %s...' % (address,)
83             else:
84                 address = None
85             
86             if address is not None:
87                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88                 if not res['isvalid'] or not res['ismine']:
89                     print '    Cached address is either invalid or not controlled by local bitcoind!'
90                     address = None
91             
92             if address is None:
93                 print '    Getting payout address from bitcoind...'
94                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
95             
96             with open(address_path, 'wb') as f:
97                 f.write(address)
98             
99             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
100         else:
101             my_pubkey_hash = args.pubkey_hash
102         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
103         print
104         
105         my_share_hashes = set()
106         my_doa_share_hashes = set()
107         
108         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109         shared_share_hashes = set()
110         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111         known_verified = set()
112         recent_blocks = []
113         print "Loading shares..."
114         for i, (mode, contents) in enumerate(ss.get_shares()):
115             if mode == 'share':
116                 if contents.hash in tracker.shares:
117                     continue
118                 shared_share_hashes.add(contents.hash)
119                 contents.time_seen = 0
120                 tracker.add(contents)
121                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122                     print "    %i" % (len(tracker.shares),)
123             elif mode == 'verified_hash':
124                 known_verified.add(contents)
125             else:
126                 raise AssertionError()
127         print "    ...inserting %i verified shares..." % (len(known_verified),)
128         for h in known_verified:
129             if h not in tracker.shares:
130                 ss.forget_verified_share(h)
131                 continue
132             tracker.verified.add(tracker.shares[h])
133         print "    ...done loading %i shares!" % (len(tracker.shares),)
134         print
135         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
138         
139         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
140         
141         pre_current_work = variable.Variable(None)
142         pre_merged_work = variable.Variable({})
143         # information affecting work that should trigger a long-polling update
144         current_work = variable.Variable(None)
145         # information affecting work that should not trigger a long-polling update
146         current_work2 = variable.Variable(None)
147         
148         requested = expiring_dict.ExpiringDict(300)
149         
150         print 'Initializing work...'
151         @defer.inlineCallbacks
152         def set_real_work1():
153             work = yield getwork(bitcoind)
154             current_work2.set(dict(
155                 time=work['time'],
156                 transactions=work['transactions'],
157                 merkle_branch=work['merkle_branch'],
158                 subsidy=work['subsidy'],
159                 clock_offset=time.time() - work['time'],
160                 last_update=time.time(),
161             )) # second set first because everything hooks on the first
162             pre_current_work.set(dict(
163                 version=work['version'],
164                 previous_block=work['previous_block_hash'],
165                 bits=work['bits'],
166                 coinbaseflags=work['coinbaseflags'],
167             ))
168         yield set_real_work1()
169         
170         if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
171             @deferral.DeferredCacher
172             @defer.inlineCallbacks
173             def height_cacher(block_hash):
174                 x = yield bitcoind.rpc_getblock('%x' % (block_hash,))
175                 defer.returnValue(x['blockcount'] if 'blockcount' in x else x['height'])
176             best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
177             def get_height_rel_highest(block_hash):
178                 this_height = height_cacher.call_now(block_hash, 0)
179                 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
180                 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
181                 return this_height - best_height_cached.value
182         else:
183             get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
184         
185         def set_real_work2():
186             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
187             
188             t = dict(pre_current_work.value)
189             t['best_share_hash'] = best
190             t['mm_chains'] = pre_merged_work.value
191             current_work.set(t)
192             
193             t = time.time()
194             for peer2, share_hash in desired:
195                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
196                     continue
197                 last_request_time, count = requested.get(share_hash, (None, 0))
198                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
199                     continue
200                 potential_peers = set()
201                 for head in tracker.tails[share_hash]:
202                     potential_peers.update(peer_heads.get(head, set()))
203                 potential_peers = [peer for peer in potential_peers if peer.connected2]
204                 if count == 0 and peer2 is not None and peer2.connected2:
205                     peer = peer2
206                 else:
207                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
208                     if peer is None:
209                         continue
210                 
211                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
212                 peer.send_getshares(
213                     hashes=[share_hash],
214                     parents=2000,
215                     stops=list(set(tracker.heads) | set(
216                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
217                     ))[:100],
218                 )
219                 requested[share_hash] = t, count + 1
220         pre_current_work.changed.watch(lambda _: set_real_work2())
221         pre_merged_work.changed.watch(lambda _: set_real_work2())
222         set_real_work2()
223         print '    ...success!'
224         print
225         
226         
227         @defer.inlineCallbacks
228         def set_merged_work(merged_url, merged_userpass):
229             merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
230             while True:
231                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
232                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
233                     hash=int(auxblock['hash'], 16),
234                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
235                     merged_proxy=merged_proxy,
236                 )}))
237                 yield deferral.sleep(1)
238         for merged_url, merged_userpass in merged_urls:
239             set_merged_work(merged_url, merged_userpass)
240         
241         @pre_merged_work.changed.watch
242         def _(new_merged_work):
243             print 'Got new merged mining work!'
244         
245         # setup p2p logic and join p2pool network
246         
247         class Node(p2p.Node):
248             def handle_shares(self, shares, peer):
249                 if len(shares) > 5:
250                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
251                 
252                 new_count = 0
253                 for share in shares:
254                     if share.hash in tracker.shares:
255                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
256                         continue
257                     
258                     new_count += 1
259                     
260                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
261                     
262                     tracker.add(share)
263                 
264                 if shares and peer is not None:
265                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
266                 
267                 if new_count:
268                     set_real_work2()
269                 
270                 if len(shares) > 5:
271                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
272             
273             def handle_share_hashes(self, hashes, peer):
274                 t = time.time()
275                 get_hashes = []
276                 for share_hash in hashes:
277                     if share_hash in tracker.shares:
278                         continue
279                     last_request_time, count = requested.get(share_hash, (None, 0))
280                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
281                         continue
282                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
283                     get_hashes.append(share_hash)
284                     requested[share_hash] = t, count + 1
285                 
286                 if hashes and peer is not None:
287                     peer_heads.setdefault(hashes[0], set()).add(peer)
288                 if get_hashes:
289                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
290             
291             def handle_get_shares(self, hashes, parents, stops, peer):
292                 parents = min(parents, 1000//len(hashes))
293                 stops = set(stops)
294                 shares = []
295                 for share_hash in hashes:
296                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
297                         if share.hash in stops:
298                             break
299                         shares.append(share)
300                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
301                 peer.sendShares(shares)
302         
303         @deferral.retry('Error submitting block: (will retry)', 10, 10)
304         @defer.inlineCallbacks
305         def submit_block(block, ignore_failure):
306             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
307             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
308             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
309                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
310         
311         @tracker.verified.added.watch
312         def _(share):
313             if share.pow_hash <= share.header['bits'].target:
314                 submit_block(share.as_block(tracker), ignore_failure=True)
315                 print
316                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
317                 print
318                 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
319         
320         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
321         
322         @defer.inlineCallbacks
323         def parse(x):
324             if ':' in x:
325                 ip, port = x.split(':')
326                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
327             else:
328                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
329         
330         addrs = {}
331         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
332             try:
333                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
334             except:
335                 print >>sys.stderr, "error reading addrs"
336         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
337             try:
338                 addr = yield addr_df
339                 if addr not in addrs:
340                     addrs[addr] = (0, time.time(), time.time())
341             except:
342                 log.err()
343         
344         connect_addrs = set()
345         for addr_df in map(parse, args.p2pool_nodes):
346             try:
347                 connect_addrs.add((yield addr_df))
348             except:
349                 log.err()
350         
351         p2p_node = Node(
352             best_share_hash_func=lambda: current_work.value['best_share_hash'],
353             port=args.p2pool_port,
354             net=net,
355             addr_store=addrs,
356             connect_addrs=connect_addrs,
357         )
358         p2p_node.start()
359         
360         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
361         
362         # send share when the chain changes to their chain
363         def work_changed(new_work):
364             #print 'Work changed:', new_work
365             shares = []
366             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
367                 if share.hash in shared_share_hashes:
368                     break
369                 shared_share_hashes.add(share.hash)
370                 shares.append(share)
371             
372             for peer in p2p_node.peers.itervalues():
373                 peer.sendShares([share for share in shares if share.peer is not peer])
374         
375         current_work.changed.watch(work_changed)
376         
377         def save_shares():
378             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
379                 ss.add_share(share)
380                 if share.hash in tracker.verified.shares:
381                     ss.add_verified_hash(share.hash)
382         task.LoopingCall(save_shares).start(60)
383         
384         print '    ...success!'
385         print
386         
387         if args.upnp:
388             @defer.inlineCallbacks
389             def upnp_thread():
390                 while True:
391                     try:
392                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
393                         if is_lan:
394                             pm = yield portmapper.get_port_mapper()
395                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
396                     except defer.TimeoutError:
397                         pass
398                     except:
399                         if p2pool.DEBUG:
400                             log.err(None, 'UPnP error:')
401                     yield deferral.sleep(random.expovariate(1/120))
402             upnp_thread()
403         
404         # start listening for workers with a JSON-RPC server
405         
406         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
407         
408         if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
409             with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
410                 vip_pass = f.read().strip('\r\n')
411         else:
412             vip_pass = '%016x' % (random.randrange(2**64),)
413             with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
414                 f.write(vip_pass)
415         print '    Worker password:', vip_pass, '(only required for generating graphs)'
416         
417         # setup worker logic
418         
419         removed_unstales_var = variable.Variable((0, 0, 0))
420         removed_doa_unstales_var = variable.Variable(0)
421         @tracker.verified.removed.watch
422         def _(share):
423             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
424                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
425                 removed_unstales_var.set((
426                     removed_unstales_var.value[0] + 1,
427                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
428                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
429                 ))
430             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
431                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
432         
433         def get_stale_counts():
434             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
435             my_shares = len(my_share_hashes)
436             my_doa_shares = len(my_doa_share_hashes)
437             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
438             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
439             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
440             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
441             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
442             
443             my_shares_not_in_chain = my_shares - my_shares_in_chain
444             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
445             
446             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
447         
448         
449         pseudoshare_received = variable.Event()
450         local_rate_monitor = math.RateMonitor(10*60)
451         
452         class WorkerBridge(worker_interface.WorkerBridge):
453             def __init__(self):
454                 worker_interface.WorkerBridge.__init__(self)
455                 self.new_work_event = current_work.changed
456                 self.recent_shares_ts_work = []
457             
458             def preprocess_request(self, request):
459                 user = request.getUser() if request.getUser() is not None else ''
460                 
461                 desired_pseudoshare_target = None
462                 if '+' in user:
463                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
464                     try:
465                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
466                     except:
467                         pass
468                 
469                 desired_share_target = 2**256 - 1
470                 if '/' in user:
471                     user, min_diff_str = user.rsplit('/', 1)
472                     try:
473                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
474                     except:
475                         pass
476                 
477                 if random.uniform(0, 100) < args.worker_fee:
478                     pubkey_hash = my_pubkey_hash
479                 else:
480                     try:
481                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
482                     except: # XXX blah
483                         pubkey_hash = my_pubkey_hash
484                 
485                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
486             
487             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
488                 if len(p2p_node.peers) == 0 and net.PERSIST:
489                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
490                 if current_work.value['best_share_hash'] is None and net.PERSIST:
491                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
492                 if time.time() > current_work2.value['last_update'] + 60:
493                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
494                 
495                 if current_work.value['mm_chains']:
496                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
497                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
498                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
499                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
500                         size=size,
501                         nonce=0,
502                     ))
503                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
504                 else:
505                     mm_data = ''
506                     mm_later = []
507                 
508                 share_info, generate_tx = p2pool_data.generate_transaction(
509                     tracker=tracker,
510                     share_data=dict(
511                         previous_share_hash=current_work.value['best_share_hash'],
512                         coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
513                         nonce=random.randrange(2**32),
514                         pubkey_hash=pubkey_hash,
515                         subsidy=current_work2.value['subsidy'],
516                         donation=math.perfect_round(65535*args.donation_percentage/100),
517                         stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
518                             253 if orphans > orphans_recorded_in_chain else
519                             254 if doas > doas_recorded_in_chain else
520                             0
521                         )(*get_stale_counts()),
522                     ),
523                     block_target=current_work.value['bits'].target,
524                     desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
525                     desired_target=desired_share_target,
526                     net=net,
527                 )
528                 
529                 target = net.PARENT.SANE_MAX_TARGET
530                 if desired_pseudoshare_target is None:
531                     if len(self.recent_shares_ts_work) == 50:
532                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
533                         target = min(target, 2**256//hash_rate)
534                 else:
535                     target = min(target, desired_pseudoshare_target)
536                 target = max(target, share_info['bits'].target)
537                 for aux_work in current_work.value['mm_chains'].itervalues():
538                     target = max(target, aux_work['target'])
539                 
540                 transactions = [generate_tx] + list(current_work2.value['transactions'])
541                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
542                 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
543                 
544                 getwork_time = time.time()
545                 merkle_branch = current_work2.value['merkle_branch']
546                 
547                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
548                     bitcoin_data.target_to_difficulty(target),
549                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
550                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
551                     len(current_work2.value['transactions']),
552                 )
553                 
554                 ba = bitcoin_getwork.BlockAttempt(
555                     version=current_work.value['version'],
556                     previous_block=current_work.value['previous_block'],
557                     merkle_root=merkle_root,
558                     timestamp=current_work2.value['time'],
559                     bits=current_work.value['bits'],
560                     share_target=target,
561                 )
562                 
563                 received_header_hashes = set()
564                 
565                 def got_response(header, request):
566                     assert header['merkle_root'] == merkle_root
567                     
568                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
569                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
570                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
571                     
572                     try:
573                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
574                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
575                             if pow_hash <= header['bits'].target:
576                                 print
577                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
578                                 print
579                                 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
580                     except:
581                         log.err(None, 'Error while processing potential block:')
582                     
583                     for aux_work, index, hashes in mm_later:
584                         try:
585                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
586                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
587                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
588                                     bitcoin_data.aux_pow_type.pack(dict(
589                                         merkle_tx=dict(
590                                             tx=transactions[0],
591                                             block_hash=header_hash,
592                                             merkle_branch=merkle_branch,
593                                             index=0,
594                                         ),
595                                         merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
596                                         index=index,
597                                         parent_block_header=header,
598                                     )).encode('hex'),
599                                 )
600                                 @df.addCallback
601                                 def _(result):
602                                     if result != (pow_hash <= aux_work['target']):
603                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
604                                     else:
605                                         print 'Merged block submittal result: %s' % (result,)
606                                 @df.addErrback
607                                 def _(err):
608                                     log.err(err, 'Error submitting merged block:')
609                         except:
610                             log.err(None, 'Error while processing merged mining POW:')
611                     
612                     if pow_hash <= share_info['bits'].target:
613                         min_header = dict(header);del min_header['merkle_root']
614                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
615                         share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
616                         
617                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
618                             request.getUser(),
619                             p2pool_data.format_hash(share.hash),
620                             p2pool_data.format_hash(share.previous_hash),
621                             time.time() - getwork_time,
622                             ' DEAD ON ARRIVAL' if not on_time else '',
623                         )
624                         my_share_hashes.add(share.hash)
625                         if not on_time:
626                             my_doa_share_hashes.add(share.hash)
627                         
628                         tracker.add(share)
629                         if not p2pool.DEBUG:
630                             tracker.verified.add(share)
631                         set_real_work2()
632                         
633                         try:
634                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
635                                 for peer in p2p_node.peers.itervalues():
636                                     peer.sendShares([share])
637                                 shared_share_hashes.add(share.hash)
638                         except:
639                             log.err(None, 'Error forwarding block solution:')
640                     
641                     if pow_hash > target:
642                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
643                         print '    Hash:   %56x' % (pow_hash,)
644                         print '    Target: %56x' % (target,)
645                     elif header_hash in received_header_hashes:
646                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
647                     else:
648                         received_header_hashes.add(header_hash)
649                         
650                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
651                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
652                         while len(self.recent_shares_ts_work) > 50:
653                             self.recent_shares_ts_work.pop(0)
654                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
655                     
656                     return on_time
657                 
658                 return ba, got_response
659         
660         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
661         
662         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
663         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
664         
665         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
666         
667         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
668             pass
669         
670         print '    ...success!'
671         print
672         
673         
674         @defer.inlineCallbacks
675         def work_poller():
676             while True:
677                 flag = factory.new_block.get_deferred()
678                 try:
679                     yield set_real_work1()
680                 except:
681                     log.err()
682                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
683         work_poller()
684         
685         
686         # done!
687         print 'Started successfully!'
688         print
689         
690         
691         if hasattr(signal, 'SIGALRM'):
692             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
693                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
694             ))
695             signal.siginterrupt(signal.SIGALRM, False)
696             task.LoopingCall(signal.alarm, 30).start(1)
697         
698         if args.irc_announce:
699             from twisted.words.protocols import irc
700             class IRCClient(irc.IRCClient):
701                 nickname = 'p2pool%02i' % (random.randrange(100),)
702                 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
703                 def lineReceived(self, line):
704                     print repr(line)
705                     irc.IRCClient.lineReceived(self, line)
706                 def signedOn(self):
707                     irc.IRCClient.signedOn(self)
708                     self.factory.resetDelay()
709                     self.join(self.channel)
710                     self.watch_id = tracker.verified.added.watch(self._new_share)
711                     self.announced_hashes = set()
712                     self.delayed_messages = {}
713                 def privmsg(self, user, channel, message):
714                     if channel == self.channel and message in self.delayed_messages:
715                         self.delayed_messages.pop(message).cancel()
716                 def _new_share(self, share):
717                     if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
718                         self.announced_hashes.add(share.header_hash)
719                         message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
720                         self.delayed_messages[message] = reactor.callLater(random.expovariate(1/5), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
721                 def connectionLost(self, reason):
722                     tracker.verified.added.unwatch(self.watch_id)
723                     print 'IRC connection lost:', reason.getErrorMessage()
724             class IRCClientFactory(protocol.ReconnectingClientFactory):
725                 protocol = IRCClient
726             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
727         
728         @defer.inlineCallbacks
729         def status_thread():
730             last_str = None
731             last_time = 0
732             while True:
733                 yield deferral.sleep(3)
734                 try:
735                     if time.time() > current_work2.value['last_update'] + 60:
736                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
737                     
738                     height = tracker.get_height(current_work.value['best_share_hash'])
739                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
740                         height,
741                         len(tracker.verified.shares),
742                         len(tracker.shares),
743                         len(p2p_node.peers),
744                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
745                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
746                     
747                     datums, dt = local_rate_monitor.get_datums_in_last()
748                     my_att_s = sum(datum['work']/dt for datum in datums)
749                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
750                         math.format(int(my_att_s)),
751                         math.format_dt(dt),
752                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
753                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
754                     )
755                     
756                     if height > 2:
757                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
758                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
759                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
760                         
761                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
762                             shares, stale_orphan_shares, stale_doa_shares,
763                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
764                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
765                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
766                         )
767                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
768                             math.format(int(real_att_s)),
769                             100*stale_prop,
770                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
771                         )
772                     
773                     if this_str != last_str or time.time() > last_time + 15:
774                         print this_str
775                         last_str = this_str
776                         last_time = time.time()
777                 except:
778                     log.err()
779         status_thread()
780     except:
781         log.err(None, 'Fatal error:')
782         reactor.stop()
783
784 def run():
785     class FixedArgumentParser(argparse.ArgumentParser):
786         def _read_args_from_files(self, arg_strings):
787             # expand arguments referencing files
788             new_arg_strings = []
789             for arg_string in arg_strings:
790                 
791                 # for regular arguments, just add them back into the list
792                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
793                     new_arg_strings.append(arg_string)
794                 
795                 # replace arguments referencing files with the file content
796                 else:
797                     try:
798                         args_file = open(arg_string[1:])
799                         try:
800                             arg_strings = []
801                             for arg_line in args_file.read().splitlines():
802                                 for arg in self.convert_arg_line_to_args(arg_line):
803                                     arg_strings.append(arg)
804                             arg_strings = self._read_args_from_files(arg_strings)
805                             new_arg_strings.extend(arg_strings)
806                         finally:
807                             args_file.close()
808                     except IOError:
809                         err = sys.exc_info()[1]
810                         self.error(str(err))
811             
812             # return the modified argument list
813             return new_arg_strings
814         
815         def convert_arg_line_to_args(self, arg_line):
816             return [arg for arg in arg_line.split() if arg.strip()]
817     
818     
819     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
820     
821     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
822     parser.add_argument('--version', action='version', version=p2pool.__version__)
823     parser.add_argument('--net',
824         help='use specified network (default: bitcoin)',
825         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
826     parser.add_argument('--testnet',
827         help='''use the network's testnet''',
828         action='store_const', const=True, default=False, dest='testnet')
829     parser.add_argument('--debug',
830         help='enable debugging mode',
831         action='store_const', const=True, default=False, dest='debug')
832     parser.add_argument('-a', '--address',
833         help='generate payouts to this address (default: <address requested from bitcoind>)',
834         type=str, action='store', default=None, dest='address')
835     parser.add_argument('--datadir',
836         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
837         type=str, action='store', default=None, dest='datadir')
838     parser.add_argument('--logfile',
839         help='''log to this file (default: data/<NET>/log)''',
840         type=str, action='store', default=None, dest='logfile')
841     parser.add_argument('--merged',
842         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
843         type=str, action='append', default=[], dest='merged_urls')
844     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
845         help='donate this percentage of work to author of p2pool (default: 0.5)',
846         type=float, action='store', default=0.5, dest='donation_percentage')
847     parser.add_argument('--irc-announce',
848         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
849         action='store_true', default=False, dest='irc_announce')
850     
851     p2pool_group = parser.add_argument_group('p2pool interface')
852     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
853         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
854         type=int, action='store', default=None, dest='p2pool_port')
855     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
856         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
857         type=str, action='append', default=[], dest='p2pool_nodes')
858     parser.add_argument('--disable-upnp',
859         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
860         action='store_false', default=True, dest='upnp')
861     
862     worker_group = parser.add_argument_group('worker interface')
863     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
864         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
865         type=str, action='store', default=None, dest='worker_endpoint')
866     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
867         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
868         type=float, action='store', default=0, dest='worker_fee')
869     
870     bitcoind_group = parser.add_argument_group('bitcoind interface')
871     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
872         help='connect to this address (default: 127.0.0.1)',
873         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
874     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
875         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
876         type=int, action='store', default=None, dest='bitcoind_rpc_port')
877     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
878         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
879         type=int, action='store', default=None, dest='bitcoind_p2p_port')
880     
881     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
882         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
883         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
884     
885     args = parser.parse_args()
886     
887     if args.debug:
888         p2pool.DEBUG = True
889     
890     net_name = args.net_name + ('_testnet' if args.testnet else '')
891     net = networks.nets[net_name]
892     
893     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
894     if not os.path.exists(datadir_path):
895         os.makedirs(datadir_path)
896     
897     if len(args.bitcoind_rpc_userpass) > 2:
898         parser.error('a maximum of two arguments are allowed')
899     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
900     
901     if args.bitcoind_rpc_password is None:
902         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
903             parser.error('This network has no configuration file function. Manually enter your RPC password.')
904         conf_path = net.PARENT.CONF_FILE_FUNC()
905         if not os.path.exists(conf_path):
906             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
907                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
908                 '''\r\n'''
909                 '''server=1\r\n'''
910                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
911         with open(conf_path, 'rb') as f:
912             cp = ConfigParser.RawConfigParser()
913             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
914             for conf_name, var_name, var_type in [
915                 ('rpcuser', 'bitcoind_rpc_username', str),
916                 ('rpcpassword', 'bitcoind_rpc_password', str),
917                 ('rpcport', 'bitcoind_rpc_port', int),
918                 ('port', 'bitcoind_p2p_port', int),
919             ]:
920                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
921                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
922         if args.bitcoind_rpc_password is None:
923             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
924     
925     if args.bitcoind_rpc_username is None:
926         args.bitcoind_rpc_username = ''
927     
928     if args.bitcoind_rpc_port is None:
929         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
930     
931     if args.bitcoind_p2p_port is None:
932         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
933     
934     if args.p2pool_port is None:
935         args.p2pool_port = net.P2P_PORT
936     
937     if args.worker_endpoint is None:
938         worker_endpoint = '', net.WORKER_PORT
939     elif ':' not in args.worker_endpoint:
940         worker_endpoint = '', int(args.worker_endpoint)
941     else:
942         addr, port = args.worker_endpoint.rsplit(':', 1)
943         worker_endpoint = addr, int(port)
944     
945     if args.address is not None:
946         try:
947             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
948         except Exception, e:
949             parser.error('error parsing address: ' + repr(e))
950     else:
951         args.pubkey_hash = None
952     
953     def separate_url(url):
954         s = urlparse.urlsplit(url)
955         if '@' not in s.netloc:
956             parser.error('merged url netloc must contain an "@"')
957         userpass, new_netloc = s.netloc.rsplit('@', 1)
958         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
959     merged_urls = map(separate_url, args.merged_urls)
960     
961     if args.logfile is None:
962         args.logfile = os.path.join(datadir_path, 'log')
963     
964     logfile = logging.LogFile(args.logfile)
965     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
966     sys.stdout = logging.AbortPipe(pipe)
967     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
968     if hasattr(signal, "SIGUSR1"):
969         def sigusr1(signum, frame):
970             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
971             logfile.reopen()
972             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
973         signal.signal(signal.SIGUSR1, sigusr1)
974     task.LoopingCall(logfile.reopen).start(5)
975     
976     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
977     reactor.run()