don't retry getmemorypool block submittal if it returns false
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import os
7 import random
8 import struct
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
19
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
25
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29     try:
30         work = yield bitcoind.rpc_getmemorypool()
31     except jsonrpc.Error, e:
32         if e.code == -32601: # Method not found
33             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34             raise deferral.RetrySilentlyException()
35         raise
36     packed_transactions = [x.decode('hex') for x in work['transactions']]
37     defer.returnValue(dict(
38         version=work['version'],
39         previous_block_hash=int(work['previousblockhash'], 16),
40         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41         merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42         subsidy=work['coinbasevalue'],
43         time=work['time'],
44         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
46     ))
47
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
50     try:
51         print 'p2pool (version %s)' % (p2pool.__version__,)
52         print
53         
54         # connect to bitcoind over JSON-RPC and do initial getmemorypool
55         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
59         if not good:
60             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
61             return
62         temp_work = yield getwork(bitcoind)
63         print '    ...success!'
64         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
65         print
66         
67         # connect to bitcoind over bitcoin-p2p
68         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69         factory = bitcoin_p2p.ClientFactory(net.PARENT)
70         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71         yield factory.getProtocol() # waits until handshake is successful
72         print '    ...success!'
73         print
74         
75         print 'Determining payout address...'
76         if args.pubkey_hash is None:
77             address_path = os.path.join(datadir_path, 'cached_payout_address')
78             
79             if os.path.exists(address_path):
80                 with open(address_path, 'rb') as f:
81                     address = f.read().strip('\r\n')
82                 print '    Loaded cached address: %s...' % (address,)
83             else:
84                 address = None
85             
86             if address is not None:
87                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88                 if not res['isvalid'] or not res['ismine']:
89                     print '    Cached address is either invalid or not controlled by local bitcoind!'
90                     address = None
91             
92             if address is None:
93                 print '    Getting payout address from bitcoind...'
94                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
95             
96             with open(address_path, 'wb') as f:
97                 f.write(address)
98             
99             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
100         else:
101             my_pubkey_hash = args.pubkey_hash
102         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
103         print
104         
105         my_share_hashes = set()
106         my_doa_share_hashes = set()
107         
108         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109         shared_share_hashes = set()
110         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111         known_verified = set()
112         recent_blocks = []
113         print "Loading shares..."
114         for i, (mode, contents) in enumerate(ss.get_shares()):
115             if mode == 'share':
116                 if contents.hash in tracker.shares:
117                     continue
118                 shared_share_hashes.add(contents.hash)
119                 contents.time_seen = 0
120                 tracker.add(contents)
121                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122                     print "    %i" % (len(tracker.shares),)
123             elif mode == 'verified_hash':
124                 known_verified.add(contents)
125             else:
126                 raise AssertionError()
127         print "    ...inserting %i verified shares..." % (len(known_verified),)
128         for h in known_verified:
129             if h not in tracker.shares:
130                 ss.forget_verified_share(h)
131                 continue
132             tracker.verified.add(tracker.shares[h])
133         print "    ...done loading %i shares!" % (len(tracker.shares),)
134         print
135         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
138         
139         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
140         
141         pre_current_work = variable.Variable(None)
142         pre_merged_work = variable.Variable({})
143         # information affecting work that should trigger a long-polling update
144         current_work = variable.Variable(None)
145         # information affecting work that should not trigger a long-polling update
146         current_work2 = variable.Variable(None)
147         
148         requested = expiring_dict.ExpiringDict(300)
149         
150         print 'Initializing work...'
151         @defer.inlineCallbacks
152         def set_real_work1():
153             work = yield getwork(bitcoind)
154             current_work2.set(dict(
155                 time=work['time'],
156                 transactions=work['transactions'],
157                 merkle_branch=work['merkle_branch'],
158                 subsidy=work['subsidy'],
159                 clock_offset=time.time() - work['time'],
160                 last_update=time.time(),
161             )) # second set first because everything hooks on the first
162             pre_current_work.set(dict(
163                 version=work['version'],
164                 previous_block=work['previous_block_hash'],
165                 bits=work['bits'],
166                 coinbaseflags=work['coinbaseflags'],
167             ))
168         yield set_real_work1()
169         
170         if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
171             height_cacher = deferral.DeferredCacher(defer.inlineCallbacks(lambda block_hash: defer.returnValue((lambda x: x['blockcount'] if 'blockcount' in x else x['height'])((yield bitcoind.rpc_getblock('%x' % (block_hash,)))))))
172             best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
173             def get_height_rel_highest(block_hash):
174                 this_height = height_cacher.call_now(block_hash, 0)
175                 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
176                 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
177                 return this_height - best_height_cached.value
178         else:
179             get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
180         
181         def set_real_work2():
182             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
183             
184             t = dict(pre_current_work.value)
185             t['best_share_hash'] = best
186             t['mm_chains'] = pre_merged_work.value
187             current_work.set(t)
188             
189             t = time.time()
190             for peer2, share_hash in desired:
191                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
192                     continue
193                 last_request_time, count = requested.get(share_hash, (None, 0))
194                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
195                     continue
196                 potential_peers = set()
197                 for head in tracker.tails[share_hash]:
198                     potential_peers.update(peer_heads.get(head, set()))
199                 potential_peers = [peer for peer in potential_peers if peer.connected2]
200                 if count == 0 and peer2 is not None and peer2.connected2:
201                     peer = peer2
202                 else:
203                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
204                     if peer is None:
205                         continue
206                 
207                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
208                 peer.send_getshares(
209                     hashes=[share_hash],
210                     parents=2000,
211                     stops=list(set(tracker.heads) | set(
212                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
213                     ))[:100],
214                 )
215                 requested[share_hash] = t, count + 1
216         pre_current_work.changed.watch(lambda _: set_real_work2())
217         pre_merged_work.changed.watch(lambda _: set_real_work2())
218         set_real_work2()
219         print '    ...success!'
220         print
221         
222         
223         @defer.inlineCallbacks
224         def set_merged_work(merged_url, merged_userpass):
225             merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
226             while True:
227                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
228                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
229                     hash=int(auxblock['hash'], 16),
230                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
231                     merged_proxy=merged_proxy,
232                 )}))
233                 yield deferral.sleep(1)
234         for merged_url, merged_userpass in merged_urls:
235             set_merged_work(merged_url, merged_userpass)
236         
237         @pre_merged_work.changed.watch
238         def _(new_merged_work):
239             print 'Got new merged mining work!'
240         
241         # setup p2p logic and join p2pool network
242         
243         class Node(p2p.Node):
244             def handle_shares(self, shares, peer):
245                 if len(shares) > 5:
246                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
247                 
248                 new_count = 0
249                 for share in shares:
250                     if share.hash in tracker.shares:
251                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
252                         continue
253                     
254                     new_count += 1
255                     
256                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
257                     
258                     tracker.add(share)
259                 
260                 if shares and peer is not None:
261                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
262                 
263                 if new_count:
264                     set_real_work2()
265                 
266                 if len(shares) > 5:
267                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
268             
269             def handle_share_hashes(self, hashes, peer):
270                 t = time.time()
271                 get_hashes = []
272                 for share_hash in hashes:
273                     if share_hash in tracker.shares:
274                         continue
275                     last_request_time, count = requested.get(share_hash, (None, 0))
276                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
277                         continue
278                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
279                     get_hashes.append(share_hash)
280                     requested[share_hash] = t, count + 1
281                 
282                 if hashes and peer is not None:
283                     peer_heads.setdefault(hashes[0], set()).add(peer)
284                 if get_hashes:
285                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
286             
287             def handle_get_shares(self, hashes, parents, stops, peer):
288                 parents = min(parents, 1000//len(hashes))
289                 stops = set(stops)
290                 shares = []
291                 for share_hash in hashes:
292                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
293                         if share.hash in stops:
294                             break
295                         shares.append(share)
296                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
297                 peer.sendShares(shares)
298         
299         @deferral.retry('Error submitting block: (will retry)', 10, 10)
300         @defer.inlineCallbacks
301         def submit_block(block, ignore_failure):
302             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
303             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
304             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
305                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
306         
307         @tracker.verified.added.watch
308         def _(share):
309             if share.pow_hash <= share.header['bits'].target:
310                 submit_block(share.as_block(tracker), ignore_failure=True)
311                 print
312                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
313                 print
314                 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
315         
316         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
317         
318         @defer.inlineCallbacks
319         def parse(x):
320             if ':' in x:
321                 ip, port = x.split(':')
322                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
323             else:
324                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
325         
326         addrs = {}
327         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
328             try:
329                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
330             except:
331                 print >>sys.stderr, "error reading addrs"
332         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
333             try:
334                 addr = yield addr_df
335                 if addr not in addrs:
336                     addrs[addr] = (0, time.time(), time.time())
337             except:
338                 log.err()
339         
340         connect_addrs = set()
341         for addr_df in map(parse, args.p2pool_nodes):
342             try:
343                 connect_addrs.add((yield addr_df))
344             except:
345                 log.err()
346         
347         p2p_node = Node(
348             best_share_hash_func=lambda: current_work.value['best_share_hash'],
349             port=args.p2pool_port,
350             net=net,
351             addr_store=addrs,
352             connect_addrs=connect_addrs,
353         )
354         p2p_node.start()
355         
356         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
357         
358         # send share when the chain changes to their chain
359         def work_changed(new_work):
360             #print 'Work changed:', new_work
361             shares = []
362             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
363                 if share.hash in shared_share_hashes:
364                     break
365                 shared_share_hashes.add(share.hash)
366                 shares.append(share)
367             
368             for peer in p2p_node.peers.itervalues():
369                 peer.sendShares([share for share in shares if share.peer is not peer])
370         
371         current_work.changed.watch(work_changed)
372         
373         def save_shares():
374             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
375                 ss.add_share(share)
376                 if share.hash in tracker.verified.shares:
377                     ss.add_verified_hash(share.hash)
378         task.LoopingCall(save_shares).start(60)
379         
380         print '    ...success!'
381         print
382         
383         if args.upnp:
384             @defer.inlineCallbacks
385             def upnp_thread():
386                 while True:
387                     try:
388                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
389                         if is_lan:
390                             pm = yield portmapper.get_port_mapper()
391                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
392                     except defer.TimeoutError:
393                         pass
394                     except:
395                         if p2pool.DEBUG:
396                             log.err(None, 'UPnP error:')
397                     yield deferral.sleep(random.expovariate(1/120))
398             upnp_thread()
399         
400         # start listening for workers with a JSON-RPC server
401         
402         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
403         
404         if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
405             with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
406                 vip_pass = f.read().strip('\r\n')
407         else:
408             vip_pass = '%016x' % (random.randrange(2**64),)
409             with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
410                 f.write(vip_pass)
411         print '    Worker password:', vip_pass, '(only required for generating graphs)'
412         
413         # setup worker logic
414         
415         removed_unstales_var = variable.Variable((0, 0, 0))
416         removed_doa_unstales_var = variable.Variable(0)
417         @tracker.verified.removed.watch
418         def _(share):
419             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
420                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
421                 removed_unstales_var.set((
422                     removed_unstales_var.value[0] + 1,
423                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
424                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
425                 ))
426             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
427                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
428         
429         def get_stale_counts():
430             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
431             my_shares = len(my_share_hashes)
432             my_doa_shares = len(my_doa_share_hashes)
433             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
434             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
435             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
436             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
437             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
438             
439             my_shares_not_in_chain = my_shares - my_shares_in_chain
440             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
441             
442             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
443         
444         
445         pseudoshare_received = variable.Event()
446         local_rate_monitor = math.RateMonitor(10*60)
447         
448         class WorkerBridge(worker_interface.WorkerBridge):
449             def __init__(self):
450                 worker_interface.WorkerBridge.__init__(self)
451                 self.new_work_event = current_work.changed
452                 self.recent_shares_ts_work = []
453             
454             def preprocess_request(self, request):
455                 user = request.getUser() if request.getUser() is not None else ''
456                 
457                 desired_pseudoshare_target = None
458                 if '+' in user:
459                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
460                     try:
461                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
462                     except:
463                         pass
464                 
465                 desired_share_target = 2**256 - 1
466                 if '/' in user:
467                     user, min_diff_str = user.rsplit('/', 1)
468                     try:
469                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
470                     except:
471                         pass
472                 
473                 if random.uniform(0, 100) < args.worker_fee:
474                     pubkey_hash = my_pubkey_hash
475                 else:
476                     try:
477                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
478                     except: # XXX blah
479                         pubkey_hash = my_pubkey_hash
480                 
481                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
482             
483             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
484                 if len(p2p_node.peers) == 0 and net.PERSIST:
485                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
486                 if current_work.value['best_share_hash'] is None and net.PERSIST:
487                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
488                 if time.time() > current_work2.value['last_update'] + 60:
489                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
490                 
491                 if current_work.value['mm_chains']:
492                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
493                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
494                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
495                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
496                         size=size,
497                         nonce=0,
498                     ))
499                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
500                 else:
501                     mm_data = ''
502                     mm_later = []
503                 
504                 share_info, generate_tx = p2pool_data.generate_transaction(
505                     tracker=tracker,
506                     share_data=dict(
507                         previous_share_hash=current_work.value['best_share_hash'],
508                         coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
509                         nonce=random.randrange(2**32),
510                         pubkey_hash=pubkey_hash,
511                         subsidy=current_work2.value['subsidy'],
512                         donation=math.perfect_round(65535*args.donation_percentage/100),
513                         stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
514                             253 if orphans > orphans_recorded_in_chain else
515                             254 if doas > doas_recorded_in_chain else
516                             0
517                         )(*get_stale_counts()),
518                     ),
519                     block_target=current_work.value['bits'].target,
520                     desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
521                     desired_target=desired_share_target,
522                     net=net,
523                 )
524                 
525                 target = net.PARENT.SANE_MAX_TARGET
526                 if desired_pseudoshare_target is None:
527                     if len(self.recent_shares_ts_work) == 50:
528                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
529                         target = min(target, 2**256//hash_rate)
530                 else:
531                     target = min(target, desired_pseudoshare_target)
532                 target = max(target, share_info['bits'].target)
533                 for aux_work in current_work.value['mm_chains'].itervalues():
534                     target = max(target, aux_work['target'])
535                 
536                 transactions = [generate_tx] + list(current_work2.value['transactions'])
537                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
538                 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
539                 
540                 getwork_time = time.time()
541                 merkle_branch = current_work2.value['merkle_branch']
542                 
543                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
544                     bitcoin_data.target_to_difficulty(target),
545                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
546                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
547                     len(current_work2.value['transactions']),
548                 )
549                 
550                 ba = bitcoin_getwork.BlockAttempt(
551                     version=current_work.value['version'],
552                     previous_block=current_work.value['previous_block'],
553                     merkle_root=merkle_root,
554                     timestamp=current_work2.value['time'],
555                     bits=current_work.value['bits'],
556                     share_target=target,
557                 )
558                 
559                 received_header_hashes = set()
560                 
561                 def got_response(header, request):
562                     assert header['merkle_root'] == merkle_root
563                     
564                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
565                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
566                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
567                     
568                     try:
569                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
570                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
571                             if pow_hash <= header['bits'].target:
572                                 print
573                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
574                                 print
575                                 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
576                     except:
577                         log.err(None, 'Error while processing potential block:')
578                     
579                     for aux_work, index, hashes in mm_later:
580                         try:
581                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
582                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
583                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
584                                     bitcoin_data.aux_pow_type.pack(dict(
585                                         merkle_tx=dict(
586                                             tx=transactions[0],
587                                             block_hash=header_hash,
588                                             merkle_branch=merkle_branch,
589                                             index=0,
590                                         ),
591                                         merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
592                                         index=index,
593                                         parent_block_header=header,
594                                     )).encode('hex'),
595                                 )
596                                 @df.addCallback
597                                 def _(result):
598                                     if result != (pow_hash <= aux_work['target']):
599                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
600                                     else:
601                                         print 'Merged block submittal result: %s' % (result,)
602                                 @df.addErrback
603                                 def _(err):
604                                     log.err(err, 'Error submitting merged block:')
605                         except:
606                             log.err(None, 'Error while processing merged mining POW:')
607                     
608                     if pow_hash <= share_info['bits'].target:
609                         min_header = dict(header);del min_header['merkle_root']
610                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
611                         share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
612                         
613                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
614                             request.getUser(),
615                             p2pool_data.format_hash(share.hash),
616                             p2pool_data.format_hash(share.previous_hash),
617                             time.time() - getwork_time,
618                             ' DEAD ON ARRIVAL' if not on_time else '',
619                         )
620                         my_share_hashes.add(share.hash)
621                         if not on_time:
622                             my_doa_share_hashes.add(share.hash)
623                         
624                         tracker.add(share)
625                         if not p2pool.DEBUG:
626                             tracker.verified.add(share)
627                         set_real_work2()
628                         
629                         try:
630                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
631                                 for peer in p2p_node.peers.itervalues():
632                                     peer.sendShares([share])
633                                 shared_share_hashes.add(share.hash)
634                         except:
635                             log.err(None, 'Error forwarding block solution:')
636                     
637                     if pow_hash > target:
638                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
639                         print '    Hash:   %56x' % (pow_hash,)
640                         print '    Target: %56x' % (target,)
641                     elif header_hash in received_header_hashes:
642                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
643                     else:
644                         received_header_hashes.add(header_hash)
645                         
646                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
647                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
648                         while len(self.recent_shares_ts_work) > 50:
649                             self.recent_shares_ts_work.pop(0)
650                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
651                     
652                     return on_time
653                 
654                 return ba, got_response
655         
656         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
657         
658         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
659         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
660         
661         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
662         
663         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
664             pass
665         
666         print '    ...success!'
667         print
668         
669         
670         @defer.inlineCallbacks
671         def work_poller():
672             while True:
673                 flag = factory.new_block.get_deferred()
674                 try:
675                     yield set_real_work1()
676                 except:
677                     log.err()
678                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
679         work_poller()
680         
681         
682         # done!
683         print 'Started successfully!'
684         print
685         
686         
687         if hasattr(signal, 'SIGALRM'):
688             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
689                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
690             ))
691             signal.siginterrupt(signal.SIGALRM, False)
692             task.LoopingCall(signal.alarm, 30).start(1)
693         
694         if args.irc_announce:
695             from twisted.words.protocols import irc
696             class IRCClient(irc.IRCClient):
697                 nickname = 'p2pool%02i' % (random.randrange(100),)
698                 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
699                 def lineReceived(self, line):
700                     print repr(line)
701                     irc.IRCClient.lineReceived(self, line)
702                 def signedOn(self):
703                     irc.IRCClient.signedOn(self)
704                     self.factory.resetDelay()
705                     self.join(self.channel)
706                     self.watch_id = tracker.verified.added.watch(self._new_share)
707                     self.announced_hashes = set()
708                     self.delayed_messages = {}
709                 def privmsg(self, user, channel, message):
710                     if channel == self.channel and message in self.delayed_messages:
711                         self.delayed_messages.pop(message).cancel()
712                 def _new_share(self, share):
713                     if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
714                         self.announced_hashes.add(share.header_hash)
715                         message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
716                         self.delayed_messages[message] = reactor.callLater(random.expovariate(1/5), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
717                 def connectionLost(self, reason):
718                     tracker.verified.added.unwatch(self.watch_id)
719                     print 'IRC connection lost:', reason.getErrorMessage()
720             class IRCClientFactory(protocol.ReconnectingClientFactory):
721                 protocol = IRCClient
722             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
723         
724         @defer.inlineCallbacks
725         def status_thread():
726             last_str = None
727             last_time = 0
728             while True:
729                 yield deferral.sleep(3)
730                 try:
731                     if time.time() > current_work2.value['last_update'] + 60:
732                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
733                     
734                     height = tracker.get_height(current_work.value['best_share_hash'])
735                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
736                         height,
737                         len(tracker.verified.shares),
738                         len(tracker.shares),
739                         len(p2p_node.peers),
740                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
741                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
742                     
743                     datums, dt = local_rate_monitor.get_datums_in_last()
744                     my_att_s = sum(datum['work']/dt for datum in datums)
745                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
746                         math.format(int(my_att_s)),
747                         math.format_dt(dt),
748                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
749                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
750                     )
751                     
752                     if height > 2:
753                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
754                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
755                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
756                         
757                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
758                             shares, stale_orphan_shares, stale_doa_shares,
759                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
760                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
761                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
762                         )
763                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
764                             math.format(int(real_att_s)),
765                             100*stale_prop,
766                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
767                         )
768                     
769                     if this_str != last_str or time.time() > last_time + 15:
770                         print this_str
771                         last_str = this_str
772                         last_time = time.time()
773                 except:
774                     log.err()
775         status_thread()
776     except:
777         log.err(None, 'Fatal error:')
778         reactor.stop()
779
780 def run():
781     class FixedArgumentParser(argparse.ArgumentParser):
782         def _read_args_from_files(self, arg_strings):
783             # expand arguments referencing files
784             new_arg_strings = []
785             for arg_string in arg_strings:
786                 
787                 # for regular arguments, just add them back into the list
788                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
789                     new_arg_strings.append(arg_string)
790                 
791                 # replace arguments referencing files with the file content
792                 else:
793                     try:
794                         args_file = open(arg_string[1:])
795                         try:
796                             arg_strings = []
797                             for arg_line in args_file.read().splitlines():
798                                 for arg in self.convert_arg_line_to_args(arg_line):
799                                     arg_strings.append(arg)
800                             arg_strings = self._read_args_from_files(arg_strings)
801                             new_arg_strings.extend(arg_strings)
802                         finally:
803                             args_file.close()
804                     except IOError:
805                         err = sys.exc_info()[1]
806                         self.error(str(err))
807             
808             # return the modified argument list
809             return new_arg_strings
810         
811         def convert_arg_line_to_args(self, arg_line):
812             return [arg for arg in arg_line.split() if arg.strip()]
813     
814     
815     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
816     
817     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
818     parser.add_argument('--version', action='version', version=p2pool.__version__)
819     parser.add_argument('--net',
820         help='use specified network (default: bitcoin)',
821         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
822     parser.add_argument('--testnet',
823         help='''use the network's testnet''',
824         action='store_const', const=True, default=False, dest='testnet')
825     parser.add_argument('--debug',
826         help='enable debugging mode',
827         action='store_const', const=True, default=False, dest='debug')
828     parser.add_argument('-a', '--address',
829         help='generate payouts to this address (default: <address requested from bitcoind>)',
830         type=str, action='store', default=None, dest='address')
831     parser.add_argument('--datadir',
832         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
833         type=str, action='store', default=None, dest='datadir')
834     parser.add_argument('--logfile',
835         help='''log to this file (default: data/<NET>/log)''',
836         type=str, action='store', default=None, dest='logfile')
837     parser.add_argument('--merged',
838         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
839         type=str, action='append', default=[], dest='merged_urls')
840     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
841         help='donate this percentage of work to author of p2pool (default: 0.5)',
842         type=float, action='store', default=0.5, dest='donation_percentage')
843     parser.add_argument('--irc-announce',
844         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
845         action='store_true', default=False, dest='irc_announce')
846     
847     p2pool_group = parser.add_argument_group('p2pool interface')
848     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
849         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
850         type=int, action='store', default=None, dest='p2pool_port')
851     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
852         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
853         type=str, action='append', default=[], dest='p2pool_nodes')
854     parser.add_argument('--disable-upnp',
855         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
856         action='store_false', default=True, dest='upnp')
857     
858     worker_group = parser.add_argument_group('worker interface')
859     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
860         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
861         type=str, action='store', default=None, dest='worker_endpoint')
862     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
863         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
864         type=float, action='store', default=0, dest='worker_fee')
865     
866     bitcoind_group = parser.add_argument_group('bitcoind interface')
867     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
868         help='connect to this address (default: 127.0.0.1)',
869         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
870     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
871         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
872         type=int, action='store', default=None, dest='bitcoind_rpc_port')
873     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
874         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
875         type=int, action='store', default=None, dest='bitcoind_p2p_port')
876     
877     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
878         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
879         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
880     
881     args = parser.parse_args()
882     
883     if args.debug:
884         p2pool.DEBUG = True
885     
886     net_name = args.net_name + ('_testnet' if args.testnet else '')
887     net = networks.nets[net_name]
888     
889     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
890     if not os.path.exists(datadir_path):
891         os.makedirs(datadir_path)
892     
893     if len(args.bitcoind_rpc_userpass) > 2:
894         parser.error('a maximum of two arguments are allowed')
895     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
896     
897     if args.bitcoind_rpc_password is None:
898         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
899             parser.error('This network has no configuration file function. Manually enter your RPC password.')
900         conf_path = net.PARENT.CONF_FILE_FUNC()
901         if not os.path.exists(conf_path):
902             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
903                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
904                 '''\r\n'''
905                 '''server=1\r\n'''
906                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
907         with open(conf_path, 'rb') as f:
908             cp = ConfigParser.RawConfigParser()
909             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
910             for conf_name, var_name, var_type in [
911                 ('rpcuser', 'bitcoind_rpc_username', str),
912                 ('rpcpassword', 'bitcoind_rpc_password', str),
913                 ('rpcport', 'bitcoind_rpc_port', int),
914                 ('port', 'bitcoind_p2p_port', int),
915             ]:
916                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
917                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
918         if args.bitcoind_rpc_password is None:
919             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
920     
921     if args.bitcoind_rpc_username is None:
922         args.bitcoind_rpc_username = ''
923     
924     if args.bitcoind_rpc_port is None:
925         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
926     
927     if args.bitcoind_p2p_port is None:
928         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
929     
930     if args.p2pool_port is None:
931         args.p2pool_port = net.P2P_PORT
932     
933     if args.worker_endpoint is None:
934         worker_endpoint = '', net.WORKER_PORT
935     elif ':' not in args.worker_endpoint:
936         worker_endpoint = '', int(args.worker_endpoint)
937     else:
938         addr, port = args.worker_endpoint.rsplit(':', 1)
939         worker_endpoint = addr, int(port)
940     
941     if args.address is not None:
942         try:
943             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
944         except Exception, e:
945             parser.error('error parsing address: ' + repr(e))
946     else:
947         args.pubkey_hash = None
948     
949     def separate_url(url):
950         s = urlparse.urlsplit(url)
951         if '@' not in s.netloc:
952             parser.error('merged url netloc must contain an "@"')
953         userpass, new_netloc = s.netloc.rsplit('@', 1)
954         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
955     merged_urls = map(separate_url, args.merged_urls)
956     
957     if args.logfile is None:
958         args.logfile = os.path.join(datadir_path, 'log')
959     
960     logfile = logging.LogFile(args.logfile)
961     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
962     sys.stdout = logging.AbortPipe(pipe)
963     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
964     if hasattr(signal, "SIGUSR1"):
965         def sigusr1(signum, frame):
966             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
967             logfile.reopen()
968             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
969         signal.signal(signal.SIGUSR1, sigusr1)
970     task.LoopingCall(logfile.reopen).start(5)
971     
972     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
973     reactor.run()