increased announce wait time to one minute
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import os
7 import random
8 import struct
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
19
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
25
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29     try:
30         work = yield bitcoind.rpc_getmemorypool()
31     except jsonrpc.Error, e:
32         if e.code == -32601: # Method not found
33             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34             raise deferral.RetrySilentlyException()
35         raise
36     packed_transactions = [x.decode('hex') for x in work['transactions']]
37     defer.returnValue(dict(
38         version=work['version'],
39         previous_block_hash=int(work['previousblockhash'], 16),
40         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41         merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42         subsidy=work['coinbasevalue'],
43         time=work['time'],
44         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
46     ))
47
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
50     try:
51         print 'p2pool (version %s)' % (p2pool.__version__,)
52         print
53         
54         # connect to bitcoind over JSON-RPC and do initial getmemorypool
55         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
59         if not good:
60             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
61             return
62         temp_work = yield getwork(bitcoind)
63         print '    ...success!'
64         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
65         print
66         
67         # connect to bitcoind over bitcoin-p2p
68         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69         factory = bitcoin_p2p.ClientFactory(net.PARENT)
70         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71         yield factory.getProtocol() # waits until handshake is successful
72         print '    ...success!'
73         print
74         
75         print 'Determining payout address...'
76         if args.pubkey_hash is None:
77             address_path = os.path.join(datadir_path, 'cached_payout_address')
78             
79             if os.path.exists(address_path):
80                 with open(address_path, 'rb') as f:
81                     address = f.read().strip('\r\n')
82                 print '    Loaded cached address: %s...' % (address,)
83             else:
84                 address = None
85             
86             if address is not None:
87                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88                 if not res['isvalid'] or not res['ismine']:
89                     print '    Cached address is either invalid or not controlled by local bitcoind!'
90                     address = None
91             
92             if address is None:
93                 print '    Getting payout address from bitcoind...'
94                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
95             
96             with open(address_path, 'wb') as f:
97                 f.write(address)
98             
99             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
100         else:
101             my_pubkey_hash = args.pubkey_hash
102         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
103         print
104         
105         my_share_hashes = set()
106         my_doa_share_hashes = set()
107         
108         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109         shared_share_hashes = set()
110         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111         known_verified = set()
112         recent_blocks = []
113         print "Loading shares..."
114         for i, (mode, contents) in enumerate(ss.get_shares()):
115             if mode == 'share':
116                 if contents.hash in tracker.shares:
117                     continue
118                 shared_share_hashes.add(contents.hash)
119                 contents.time_seen = 0
120                 tracker.add(contents)
121                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122                     print "    %i" % (len(tracker.shares),)
123             elif mode == 'verified_hash':
124                 known_verified.add(contents)
125             else:
126                 raise AssertionError()
127         print "    ...inserting %i verified shares..." % (len(known_verified),)
128         for h in known_verified:
129             if h not in tracker.shares:
130                 ss.forget_verified_share(h)
131                 continue
132             tracker.verified.add(tracker.shares[h])
133         print "    ...done loading %i shares!" % (len(tracker.shares),)
134         print
135         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
138         
139         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
140         
141         pre_current_work = variable.Variable(None)
142         pre_merged_work = variable.Variable({})
143         # information affecting work that should trigger a long-polling update
144         current_work = variable.Variable(None)
145         # information affecting work that should not trigger a long-polling update
146         current_work2 = variable.Variable(None)
147         
148         requested = expiring_dict.ExpiringDict(300)
149         
150         print 'Initializing work...'
151         @defer.inlineCallbacks
152         def set_real_work1():
153             work = yield getwork(bitcoind)
154             current_work2.set(dict(
155                 time=work['time'],
156                 transactions=work['transactions'],
157                 merkle_branch=work['merkle_branch'],
158                 subsidy=work['subsidy'],
159                 clock_offset=time.time() - work['time'],
160                 last_update=time.time(),
161             )) # second set first because everything hooks on the first
162             pre_current_work.set(dict(
163                 version=work['version'],
164                 previous_block=work['previous_block_hash'],
165                 bits=work['bits'],
166                 coinbaseflags=work['coinbaseflags'],
167             ))
168         yield set_real_work1()
169         
170         if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
171             @deferral.DeferredCacher
172             @defer.inlineCallbacks
173             def height_cacher(block_hash):
174                 try:
175                     x = yield bitcoind.rpc_getblock('%x' % (block_hash,))
176                 except jsonrpc.Error, e:
177                     if e.code == -5 and not p2pool.DEBUG:
178                         raise deferral.RetrySilentlyException()
179                     raise
180                 defer.returnValue(x['blockcount'] if 'blockcount' in x else x['height'])
181             best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
182             def get_height_rel_highest(block_hash):
183                 this_height = height_cacher.call_now(block_hash, 0)
184                 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
185                 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
186                 return this_height - best_height_cached.value
187         else:
188             get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
189         
190         def set_real_work2():
191             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
192             
193             t = dict(pre_current_work.value)
194             t['best_share_hash'] = best
195             t['mm_chains'] = pre_merged_work.value
196             current_work.set(t)
197             
198             t = time.time()
199             for peer2, share_hash in desired:
200                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
201                     continue
202                 last_request_time, count = requested.get(share_hash, (None, 0))
203                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
204                     continue
205                 potential_peers = set()
206                 for head in tracker.tails[share_hash]:
207                     potential_peers.update(peer_heads.get(head, set()))
208                 potential_peers = [peer for peer in potential_peers if peer.connected2]
209                 if count == 0 and peer2 is not None and peer2.connected2:
210                     peer = peer2
211                 else:
212                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
213                     if peer is None:
214                         continue
215                 
216                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
217                 peer.send_getshares(
218                     hashes=[share_hash],
219                     parents=2000,
220                     stops=list(set(tracker.heads) | set(
221                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
222                     ))[:100],
223                 )
224                 requested[share_hash] = t, count + 1
225         pre_current_work.changed.watch(lambda _: set_real_work2())
226         pre_merged_work.changed.watch(lambda _: set_real_work2())
227         set_real_work2()
228         print '    ...success!'
229         print
230         
231         
232         @defer.inlineCallbacks
233         def set_merged_work(merged_url, merged_userpass):
234             merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
235             while True:
236                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
237                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
238                     hash=int(auxblock['hash'], 16),
239                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
240                     merged_proxy=merged_proxy,
241                 )}))
242                 yield deferral.sleep(1)
243         for merged_url, merged_userpass in merged_urls:
244             set_merged_work(merged_url, merged_userpass)
245         
246         @pre_merged_work.changed.watch
247         def _(new_merged_work):
248             print 'Got new merged mining work!'
249         
250         # setup p2p logic and join p2pool network
251         
252         class Node(p2p.Node):
253             def handle_shares(self, shares, peer):
254                 if len(shares) > 5:
255                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
256                 
257                 new_count = 0
258                 for share in shares:
259                     if share.hash in tracker.shares:
260                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
261                         continue
262                     
263                     new_count += 1
264                     
265                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
266                     
267                     tracker.add(share)
268                 
269                 if shares and peer is not None:
270                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
271                 
272                 if new_count:
273                     set_real_work2()
274                 
275                 if len(shares) > 5:
276                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
277             
278             def handle_share_hashes(self, hashes, peer):
279                 t = time.time()
280                 get_hashes = []
281                 for share_hash in hashes:
282                     if share_hash in tracker.shares:
283                         continue
284                     last_request_time, count = requested.get(share_hash, (None, 0))
285                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
286                         continue
287                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
288                     get_hashes.append(share_hash)
289                     requested[share_hash] = t, count + 1
290                 
291                 if hashes and peer is not None:
292                     peer_heads.setdefault(hashes[0], set()).add(peer)
293                 if get_hashes:
294                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
295             
296             def handle_get_shares(self, hashes, parents, stops, peer):
297                 parents = min(parents, 1000//len(hashes))
298                 stops = set(stops)
299                 shares = []
300                 for share_hash in hashes:
301                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
302                         if share.hash in stops:
303                             break
304                         shares.append(share)
305                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
306                 peer.sendShares(shares)
307         
308         @deferral.retry('Error submitting block: (will retry)', 10, 10)
309         @defer.inlineCallbacks
310         def submit_block(block, ignore_failure):
311             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
312             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
313             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
314                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
315         
316         @tracker.verified.added.watch
317         def _(share):
318             if share.pow_hash <= share.header['bits'].target:
319                 submit_block(share.as_block(tracker), ignore_failure=True)
320                 print
321                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
322                 print
323                 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
324         
325         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
326         
327         @defer.inlineCallbacks
328         def parse(x):
329             if ':' in x:
330                 ip, port = x.split(':')
331                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
332             else:
333                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
334         
335         addrs = {}
336         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
337             try:
338                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
339             except:
340                 print >>sys.stderr, "error reading addrs"
341         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
342             try:
343                 addr = yield addr_df
344                 if addr not in addrs:
345                     addrs[addr] = (0, time.time(), time.time())
346             except:
347                 log.err()
348         
349         connect_addrs = set()
350         for addr_df in map(parse, args.p2pool_nodes):
351             try:
352                 connect_addrs.add((yield addr_df))
353             except:
354                 log.err()
355         
356         p2p_node = Node(
357             best_share_hash_func=lambda: current_work.value['best_share_hash'],
358             port=args.p2pool_port,
359             net=net,
360             addr_store=addrs,
361             connect_addrs=connect_addrs,
362         )
363         p2p_node.start()
364         
365         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
366         
367         # send share when the chain changes to their chain
368         def work_changed(new_work):
369             #print 'Work changed:', new_work
370             shares = []
371             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
372                 if share.hash in shared_share_hashes:
373                     break
374                 shared_share_hashes.add(share.hash)
375                 shares.append(share)
376             
377             for peer in p2p_node.peers.itervalues():
378                 peer.sendShares([share for share in shares if share.peer is not peer])
379         
380         current_work.changed.watch(work_changed)
381         
382         def save_shares():
383             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
384                 ss.add_share(share)
385                 if share.hash in tracker.verified.shares:
386                     ss.add_verified_hash(share.hash)
387         task.LoopingCall(save_shares).start(60)
388         
389         print '    ...success!'
390         print
391         
392         if args.upnp:
393             @defer.inlineCallbacks
394             def upnp_thread():
395                 while True:
396                     try:
397                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
398                         if is_lan:
399                             pm = yield portmapper.get_port_mapper()
400                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
401                     except defer.TimeoutError:
402                         pass
403                     except:
404                         if p2pool.DEBUG:
405                             log.err(None, 'UPnP error:')
406                     yield deferral.sleep(random.expovariate(1/120))
407             upnp_thread()
408         
409         # start listening for workers with a JSON-RPC server
410         
411         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
412         
413         if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
414             with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
415                 vip_pass = f.read().strip('\r\n')
416         else:
417             vip_pass = '%016x' % (random.randrange(2**64),)
418             with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
419                 f.write(vip_pass)
420         print '    Worker password:', vip_pass, '(only required for generating graphs)'
421         
422         # setup worker logic
423         
424         removed_unstales_var = variable.Variable((0, 0, 0))
425         removed_doa_unstales_var = variable.Variable(0)
426         @tracker.verified.removed.watch
427         def _(share):
428             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
429                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
430                 removed_unstales_var.set((
431                     removed_unstales_var.value[0] + 1,
432                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
433                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
434                 ))
435             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
436                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
437         
438         def get_stale_counts():
439             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
440             my_shares = len(my_share_hashes)
441             my_doa_shares = len(my_doa_share_hashes)
442             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
443             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
444             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
445             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
446             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
447             
448             my_shares_not_in_chain = my_shares - my_shares_in_chain
449             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
450             
451             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
452         
453         
454         pseudoshare_received = variable.Event()
455         local_rate_monitor = math.RateMonitor(10*60)
456         
457         class WorkerBridge(worker_interface.WorkerBridge):
458             def __init__(self):
459                 worker_interface.WorkerBridge.__init__(self)
460                 self.new_work_event = current_work.changed
461                 self.recent_shares_ts_work = []
462             
463             def preprocess_request(self, request):
464                 user = request.getUser() if request.getUser() is not None else ''
465                 
466                 desired_pseudoshare_target = None
467                 if '+' in user:
468                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
469                     try:
470                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
471                     except:
472                         pass
473                 
474                 desired_share_target = 2**256 - 1
475                 if '/' in user:
476                     user, min_diff_str = user.rsplit('/', 1)
477                     try:
478                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
479                     except:
480                         pass
481                 
482                 if random.uniform(0, 100) < args.worker_fee:
483                     pubkey_hash = my_pubkey_hash
484                 else:
485                     try:
486                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
487                     except: # XXX blah
488                         pubkey_hash = my_pubkey_hash
489                 
490                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
491             
492             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
493                 if len(p2p_node.peers) == 0 and net.PERSIST:
494                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
495                 if current_work.value['best_share_hash'] is None and net.PERSIST:
496                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
497                 if time.time() > current_work2.value['last_update'] + 60:
498                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
499                 
500                 if current_work.value['mm_chains']:
501                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
502                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
503                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
504                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
505                         size=size,
506                         nonce=0,
507                     ))
508                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
509                 else:
510                     mm_data = ''
511                     mm_later = []
512                 
513                 share_info, generate_tx = p2pool_data.generate_transaction(
514                     tracker=tracker,
515                     share_data=dict(
516                         previous_share_hash=current_work.value['best_share_hash'],
517                         coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
518                         nonce=random.randrange(2**32),
519                         pubkey_hash=pubkey_hash,
520                         subsidy=current_work2.value['subsidy'],
521                         donation=math.perfect_round(65535*args.donation_percentage/100),
522                         stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
523                             253 if orphans > orphans_recorded_in_chain else
524                             254 if doas > doas_recorded_in_chain else
525                             0
526                         )(*get_stale_counts()),
527                     ),
528                     block_target=current_work.value['bits'].target,
529                     desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
530                     desired_target=desired_share_target,
531                     net=net,
532                 )
533                 
534                 target = net.PARENT.SANE_MAX_TARGET
535                 if desired_pseudoshare_target is None:
536                     if len(self.recent_shares_ts_work) == 50:
537                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
538                         target = min(target, 2**256//hash_rate)
539                 else:
540                     target = min(target, desired_pseudoshare_target)
541                 target = max(target, share_info['bits'].target)
542                 for aux_work in current_work.value['mm_chains'].itervalues():
543                     target = max(target, aux_work['target'])
544                 
545                 transactions = [generate_tx] + list(current_work2.value['transactions'])
546                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
547                 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
548                 
549                 getwork_time = time.time()
550                 merkle_branch = current_work2.value['merkle_branch']
551                 
552                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
553                     bitcoin_data.target_to_difficulty(target),
554                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
555                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
556                     len(current_work2.value['transactions']),
557                 )
558                 
559                 ba = bitcoin_getwork.BlockAttempt(
560                     version=current_work.value['version'],
561                     previous_block=current_work.value['previous_block'],
562                     merkle_root=merkle_root,
563                     timestamp=current_work2.value['time'],
564                     bits=current_work.value['bits'],
565                     share_target=target,
566                 )
567                 
568                 received_header_hashes = set()
569                 
570                 def got_response(header, request):
571                     assert header['merkle_root'] == merkle_root
572                     
573                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
574                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
575                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
576                     
577                     try:
578                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
579                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
580                             if pow_hash <= header['bits'].target:
581                                 print
582                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
583                                 print
584                                 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
585                     except:
586                         log.err(None, 'Error while processing potential block:')
587                     
588                     for aux_work, index, hashes in mm_later:
589                         try:
590                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
591                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
592                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
593                                     bitcoin_data.aux_pow_type.pack(dict(
594                                         merkle_tx=dict(
595                                             tx=transactions[0],
596                                             block_hash=header_hash,
597                                             merkle_branch=merkle_branch,
598                                             index=0,
599                                         ),
600                                         merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
601                                         index=index,
602                                         parent_block_header=header,
603                                     )).encode('hex'),
604                                 )
605                                 @df.addCallback
606                                 def _(result):
607                                     if result != (pow_hash <= aux_work['target']):
608                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
609                                     else:
610                                         print 'Merged block submittal result: %s' % (result,)
611                                 @df.addErrback
612                                 def _(err):
613                                     log.err(err, 'Error submitting merged block:')
614                         except:
615                             log.err(None, 'Error while processing merged mining POW:')
616                     
617                     if pow_hash <= share_info['bits'].target:
618                         min_header = dict(header);del min_header['merkle_root']
619                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
620                         share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
621                         
622                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
623                             request.getUser(),
624                             p2pool_data.format_hash(share.hash),
625                             p2pool_data.format_hash(share.previous_hash),
626                             time.time() - getwork_time,
627                             ' DEAD ON ARRIVAL' if not on_time else '',
628                         )
629                         my_share_hashes.add(share.hash)
630                         if not on_time:
631                             my_doa_share_hashes.add(share.hash)
632                         
633                         tracker.add(share)
634                         if not p2pool.DEBUG:
635                             tracker.verified.add(share)
636                         set_real_work2()
637                         
638                         try:
639                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
640                                 for peer in p2p_node.peers.itervalues():
641                                     peer.sendShares([share])
642                                 shared_share_hashes.add(share.hash)
643                         except:
644                             log.err(None, 'Error forwarding block solution:')
645                     
646                     if pow_hash > target:
647                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
648                         print '    Hash:   %56x' % (pow_hash,)
649                         print '    Target: %56x' % (target,)
650                     elif header_hash in received_header_hashes:
651                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
652                     else:
653                         received_header_hashes.add(header_hash)
654                         
655                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
656                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
657                         while len(self.recent_shares_ts_work) > 50:
658                             self.recent_shares_ts_work.pop(0)
659                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
660                     
661                     return on_time
662                 
663                 return ba, got_response
664         
665         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
666         
667         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
668         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
669         
670         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
671         
672         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
673             pass
674         
675         print '    ...success!'
676         print
677         
678         
679         @defer.inlineCallbacks
680         def work_poller():
681             while True:
682                 flag = factory.new_block.get_deferred()
683                 try:
684                     yield set_real_work1()
685                 except:
686                     log.err()
687                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
688         work_poller()
689         
690         
691         # done!
692         print 'Started successfully!'
693         print
694         
695         
696         if hasattr(signal, 'SIGALRM'):
697             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
698                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
699             ))
700             signal.siginterrupt(signal.SIGALRM, False)
701             task.LoopingCall(signal.alarm, 30).start(1)
702         
703         if args.irc_announce:
704             from twisted.words.protocols import irc
705             class IRCClient(irc.IRCClient):
706                 nickname = 'p2pool%02i' % (random.randrange(100),)
707                 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
708                 def lineReceived(self, line):
709                     print repr(line)
710                     irc.IRCClient.lineReceived(self, line)
711                 def signedOn(self):
712                     irc.IRCClient.signedOn(self)
713                     self.factory.resetDelay()
714                     self.join(self.channel)
715                     self.watch_id = tracker.verified.added.watch(self._new_share)
716                     self.announced_hashes = set()
717                     self.delayed_messages = {}
718                 def privmsg(self, user, channel, message):
719                     if channel == self.channel and message in self.delayed_messages:
720                         self.delayed_messages.pop(message).cancel()
721                 def _new_share(self, share):
722                     if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
723                         self.announced_hashes.add(share.header_hash)
724                         message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
725                         self.delayed_messages[message] = reactor.callLater(random.expovariate(1/60), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
726                 def connectionLost(self, reason):
727                     tracker.verified.added.unwatch(self.watch_id)
728                     print 'IRC connection lost:', reason.getErrorMessage()
729             class IRCClientFactory(protocol.ReconnectingClientFactory):
730                 protocol = IRCClient
731             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
732         
733         @defer.inlineCallbacks
734         def status_thread():
735             last_str = None
736             last_time = 0
737             while True:
738                 yield deferral.sleep(3)
739                 try:
740                     if time.time() > current_work2.value['last_update'] + 60:
741                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
742                     
743                     height = tracker.get_height(current_work.value['best_share_hash'])
744                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
745                         height,
746                         len(tracker.verified.shares),
747                         len(tracker.shares),
748                         len(p2p_node.peers),
749                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
750                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
751                     
752                     datums, dt = local_rate_monitor.get_datums_in_last()
753                     my_att_s = sum(datum['work']/dt for datum in datums)
754                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
755                         math.format(int(my_att_s)),
756                         math.format_dt(dt),
757                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
758                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
759                     )
760                     
761                     if height > 2:
762                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
763                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
764                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
765                         
766                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
767                             shares, stale_orphan_shares, stale_doa_shares,
768                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
769                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
770                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
771                         )
772                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
773                             math.format(int(real_att_s)),
774                             100*stale_prop,
775                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
776                         )
777                     
778                     if this_str != last_str or time.time() > last_time + 15:
779                         print this_str
780                         last_str = this_str
781                         last_time = time.time()
782                 except:
783                     log.err()
784         status_thread()
785     except:
786         log.err(None, 'Fatal error:')
787         reactor.stop()
788
789 def run():
790     class FixedArgumentParser(argparse.ArgumentParser):
791         def _read_args_from_files(self, arg_strings):
792             # expand arguments referencing files
793             new_arg_strings = []
794             for arg_string in arg_strings:
795                 
796                 # for regular arguments, just add them back into the list
797                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
798                     new_arg_strings.append(arg_string)
799                 
800                 # replace arguments referencing files with the file content
801                 else:
802                     try:
803                         args_file = open(arg_string[1:])
804                         try:
805                             arg_strings = []
806                             for arg_line in args_file.read().splitlines():
807                                 for arg in self.convert_arg_line_to_args(arg_line):
808                                     arg_strings.append(arg)
809                             arg_strings = self._read_args_from_files(arg_strings)
810                             new_arg_strings.extend(arg_strings)
811                         finally:
812                             args_file.close()
813                     except IOError:
814                         err = sys.exc_info()[1]
815                         self.error(str(err))
816             
817             # return the modified argument list
818             return new_arg_strings
819         
820         def convert_arg_line_to_args(self, arg_line):
821             return [arg for arg in arg_line.split() if arg.strip()]
822     
823     
824     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
825     
826     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
827     parser.add_argument('--version', action='version', version=p2pool.__version__)
828     parser.add_argument('--net',
829         help='use specified network (default: bitcoin)',
830         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
831     parser.add_argument('--testnet',
832         help='''use the network's testnet''',
833         action='store_const', const=True, default=False, dest='testnet')
834     parser.add_argument('--debug',
835         help='enable debugging mode',
836         action='store_const', const=True, default=False, dest='debug')
837     parser.add_argument('-a', '--address',
838         help='generate payouts to this address (default: <address requested from bitcoind>)',
839         type=str, action='store', default=None, dest='address')
840     parser.add_argument('--datadir',
841         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
842         type=str, action='store', default=None, dest='datadir')
843     parser.add_argument('--logfile',
844         help='''log to this file (default: data/<NET>/log)''',
845         type=str, action='store', default=None, dest='logfile')
846     parser.add_argument('--merged',
847         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
848         type=str, action='append', default=[], dest='merged_urls')
849     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
850         help='donate this percentage of work to author of p2pool (default: 0.5)',
851         type=float, action='store', default=0.5, dest='donation_percentage')
852     parser.add_argument('--irc-announce',
853         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
854         action='store_true', default=False, dest='irc_announce')
855     
856     p2pool_group = parser.add_argument_group('p2pool interface')
857     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
858         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
859         type=int, action='store', default=None, dest='p2pool_port')
860     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
861         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
862         type=str, action='append', default=[], dest='p2pool_nodes')
863     parser.add_argument('--disable-upnp',
864         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
865         action='store_false', default=True, dest='upnp')
866     
867     worker_group = parser.add_argument_group('worker interface')
868     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
869         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
870         type=str, action='store', default=None, dest='worker_endpoint')
871     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
872         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
873         type=float, action='store', default=0, dest='worker_fee')
874     
875     bitcoind_group = parser.add_argument_group('bitcoind interface')
876     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
877         help='connect to this address (default: 127.0.0.1)',
878         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
879     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
880         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
881         type=int, action='store', default=None, dest='bitcoind_rpc_port')
882     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
883         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
884         type=int, action='store', default=None, dest='bitcoind_p2p_port')
885     
886     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
887         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
888         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
889     
890     args = parser.parse_args()
891     
892     if args.debug:
893         p2pool.DEBUG = True
894     
895     net_name = args.net_name + ('_testnet' if args.testnet else '')
896     net = networks.nets[net_name]
897     
898     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
899     if not os.path.exists(datadir_path):
900         os.makedirs(datadir_path)
901     
902     if len(args.bitcoind_rpc_userpass) > 2:
903         parser.error('a maximum of two arguments are allowed')
904     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
905     
906     if args.bitcoind_rpc_password is None:
907         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
908             parser.error('This network has no configuration file function. Manually enter your RPC password.')
909         conf_path = net.PARENT.CONF_FILE_FUNC()
910         if not os.path.exists(conf_path):
911             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
912                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
913                 '''\r\n'''
914                 '''server=1\r\n'''
915                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
916         with open(conf_path, 'rb') as f:
917             cp = ConfigParser.RawConfigParser()
918             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
919             for conf_name, var_name, var_type in [
920                 ('rpcuser', 'bitcoind_rpc_username', str),
921                 ('rpcpassword', 'bitcoind_rpc_password', str),
922                 ('rpcport', 'bitcoind_rpc_port', int),
923                 ('port', 'bitcoind_p2p_port', int),
924             ]:
925                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
926                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
927         if args.bitcoind_rpc_password is None:
928             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
929     
930     if args.bitcoind_rpc_username is None:
931         args.bitcoind_rpc_username = ''
932     
933     if args.bitcoind_rpc_port is None:
934         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
935     
936     if args.bitcoind_p2p_port is None:
937         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
938     
939     if args.p2pool_port is None:
940         args.p2pool_port = net.P2P_PORT
941     
942     if args.worker_endpoint is None:
943         worker_endpoint = '', net.WORKER_PORT
944     elif ':' not in args.worker_endpoint:
945         worker_endpoint = '', int(args.worker_endpoint)
946     else:
947         addr, port = args.worker_endpoint.rsplit(':', 1)
948         worker_endpoint = addr, int(port)
949     
950     if args.address is not None:
951         try:
952             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
953         except Exception, e:
954             parser.error('error parsing address: ' + repr(e))
955     else:
956         args.pubkey_hash = None
957     
958     def separate_url(url):
959         s = urlparse.urlsplit(url)
960         if '@' not in s.netloc:
961             parser.error('merged url netloc must contain an "@"')
962         userpass, new_netloc = s.netloc.rsplit('@', 1)
963         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
964     merged_urls = map(separate_url, args.merged_urls)
965     
966     if args.logfile is None:
967         args.logfile = os.path.join(datadir_path, 'log')
968     
969     logfile = logging.LogFile(args.logfile)
970     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
971     sys.stdout = logging.AbortPipe(pipe)
972     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
973     if hasattr(signal, "SIGUSR1"):
974         def sigusr1(signum, frame):
975             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
976             logfile.reopen()
977             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
978         signal.signal(signal.SIGUSR1, sigusr1)
979     task.LoopingCall(logfile.reopen).start(5)
980     
981     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
982     reactor.run()