379522fb60fa55c116918811259826cc887a095d
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import os
7 import random
8 import sys
9 import time
10 import signal
11 import traceback
12 import urlparse
13
14 from twisted.internet import defer, reactor, protocol, task
15 from twisted.web import server
16 from twisted.python import log
17 from nattraverso import portmapper, ipdiscover
18
19 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
20 from bitcoin import worker_interface, height_tracker
21 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
22 from . import p2p, networks, web
23 import p2pool, p2pool.data as p2pool_data
24
25 @deferral.retry('Error getting work from bitcoind:', 3)
26 @defer.inlineCallbacks
27 def getwork(bitcoind):
28     try:
29         work = yield bitcoind.rpc_getmemorypool()
30     except jsonrpc.Error, e:
31         if e.code == -32601: # Method not found
32             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
33             raise deferral.RetrySilentlyException()
34         raise
35     packed_transactions = [x.decode('hex') for x in work['transactions']]
36     defer.returnValue(dict(
37         version=work['version'],
38         previous_block_hash=int(work['previousblockhash'], 16),
39         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
40         merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
41         subsidy=work['coinbasevalue'],
42         time=work['time'],
43         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
44         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
45     ))
46
47 @defer.inlineCallbacks
48 def main(args, net, datadir_path, merged_urls, worker_endpoint):
49     try:
50         print 'p2pool (version %s)' % (p2pool.__version__,)
51         print
52         
53         # connect to bitcoind over JSON-RPC and do initial getmemorypool
54         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
55         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
56         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
57         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
58         if not good:
59             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
60             return
61         temp_work = yield getwork(bitcoind)
62         print '    ...success!'
63         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
64         print
65         
66         # connect to bitcoind over bitcoin-p2p
67         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
68         factory = bitcoin_p2p.ClientFactory(net.PARENT)
69         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
70         yield factory.getProtocol() # waits until handshake is successful
71         print '    ...success!'
72         print
73         
74         print 'Determining payout address...'
75         if args.pubkey_hash is None:
76             address_path = os.path.join(datadir_path, 'cached_payout_address')
77             
78             if os.path.exists(address_path):
79                 with open(address_path, 'rb') as f:
80                     address = f.read().strip('\r\n')
81                 print '    Loaded cached address: %s...' % (address,)
82             else:
83                 address = None
84             
85             if address is not None:
86                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
87                 if not res['isvalid'] or not res['ismine']:
88                     print '    Cached address is either invalid or not controlled by local bitcoind!'
89                     address = None
90             
91             if address is None:
92                 print '    Getting payout address from bitcoind...'
93                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
94             
95             with open(address_path, 'wb') as f:
96                 f.write(address)
97             
98             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
99         else:
100             my_pubkey_hash = args.pubkey_hash
101         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
102         print
103         
104         my_share_hashes = set()
105         my_doa_share_hashes = set()
106         
107         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
108         shared_share_hashes = set()
109         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
110         known_verified = set()
111         recent_blocks = []
112         print "Loading shares..."
113         for i, (mode, contents) in enumerate(ss.get_shares()):
114             if mode == 'share':
115                 if contents.hash in tracker.shares:
116                     continue
117                 shared_share_hashes.add(contents.hash)
118                 contents.time_seen = 0
119                 tracker.add(contents)
120                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
121                     print "    %i" % (len(tracker.shares),)
122             elif mode == 'verified_hash':
123                 known_verified.add(contents)
124             else:
125                 raise AssertionError()
126         print "    ...inserting %i verified shares..." % (len(known_verified),)
127         for h in known_verified:
128             if h not in tracker.shares:
129                 ss.forget_verified_share(h)
130                 continue
131             tracker.verified.add(tracker.shares[h])
132         print "    ...done loading %i shares!" % (len(tracker.shares),)
133         print
134         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
135         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
136         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
137         
138         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
139         
140         pre_current_work = variable.Variable(None)
141         pre_merged_work = variable.Variable({})
142         # information affecting work that should trigger a long-polling update
143         current_work = variable.Variable(None)
144         # information affecting work that should not trigger a long-polling update
145         current_work2 = variable.Variable(None)
146         
147         requested = expiring_dict.ExpiringDict(300)
148         
149         print 'Initializing work...'
150         @defer.inlineCallbacks
151         def set_real_work1():
152             work = yield getwork(bitcoind)
153             current_work2.set(dict(
154                 time=work['time'],
155                 transactions=work['transactions'],
156                 merkle_branch=work['merkle_branch'],
157                 subsidy=work['subsidy'],
158                 clock_offset=time.time() - work['time'],
159                 last_update=time.time(),
160             )) # second set first because everything hooks on the first
161             pre_current_work.set(dict(
162                 version=work['version'],
163                 previous_block=work['previous_block_hash'],
164                 bits=work['bits'],
165                 coinbaseflags=work['coinbaseflags'],
166             ))
167         yield set_real_work1()
168         
169         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, pre_current_work, net)
170         
171         def set_real_work2():
172             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
173             
174             t = dict(pre_current_work.value)
175             t['best_share_hash'] = best
176             t['mm_chains'] = pre_merged_work.value
177             current_work.set(t)
178             
179             t = time.time()
180             for peer2, share_hash in desired:
181                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
182                     continue
183                 last_request_time, count = requested.get(share_hash, (None, 0))
184                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
185                     continue
186                 potential_peers = set()
187                 for head in tracker.tails[share_hash]:
188                     potential_peers.update(peer_heads.get(head, set()))
189                 potential_peers = [peer for peer in potential_peers if peer.connected2]
190                 if count == 0 and peer2 is not None and peer2.connected2:
191                     peer = peer2
192                 else:
193                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
194                     if peer is None:
195                         continue
196                 
197                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
198                 peer.send_getshares(
199                     hashes=[share_hash],
200                     parents=2000,
201                     stops=list(set(tracker.heads) | set(
202                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
203                     ))[:100],
204                 )
205                 requested[share_hash] = t, count + 1
206         pre_current_work.changed.watch(lambda _: set_real_work2())
207         pre_merged_work.changed.watch(lambda _: set_real_work2())
208         set_real_work2()
209         print '    ...success!'
210         print
211         
212         
213         @defer.inlineCallbacks
214         def set_merged_work(merged_url, merged_userpass):
215             merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
216             while True:
217                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
218                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
219                     hash=int(auxblock['hash'], 16),
220                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
221                     merged_proxy=merged_proxy,
222                 )}))
223                 yield deferral.sleep(1)
224         for merged_url, merged_userpass in merged_urls:
225             set_merged_work(merged_url, merged_userpass)
226         
227         @pre_merged_work.changed.watch
228         def _(new_merged_work):
229             print 'Got new merged mining work!'
230         
231         # setup p2p logic and join p2pool network
232         
233         class Node(p2p.Node):
234             def handle_shares(self, shares, peer):
235                 if len(shares) > 5:
236                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
237                 
238                 new_count = 0
239                 for share in shares:
240                     if share.hash in tracker.shares:
241                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
242                         continue
243                     
244                     new_count += 1
245                     
246                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
247                     
248                     tracker.add(share)
249                 
250                 if shares and peer is not None:
251                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
252                 
253                 if new_count:
254                     set_real_work2()
255                 
256                 if len(shares) > 5:
257                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
258             
259             def handle_share_hashes(self, hashes, peer):
260                 t = time.time()
261                 get_hashes = []
262                 for share_hash in hashes:
263                     if share_hash in tracker.shares:
264                         continue
265                     last_request_time, count = requested.get(share_hash, (None, 0))
266                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
267                         continue
268                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
269                     get_hashes.append(share_hash)
270                     requested[share_hash] = t, count + 1
271                 
272                 if hashes and peer is not None:
273                     peer_heads.setdefault(hashes[0], set()).add(peer)
274                 if get_hashes:
275                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
276             
277             def handle_get_shares(self, hashes, parents, stops, peer):
278                 parents = min(parents, 1000//len(hashes))
279                 stops = set(stops)
280                 shares = []
281                 for share_hash in hashes:
282                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
283                         if share.hash in stops:
284                             break
285                         shares.append(share)
286                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
287                 peer.sendShares(shares)
288         
289         @deferral.retry('Error submitting block: (will retry)', 10, 10)
290         @defer.inlineCallbacks
291         def submit_block(block, ignore_failure):
292             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
293             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
294             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
295                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
296         
297         @tracker.verified.added.watch
298         def _(share):
299             if share.pow_hash <= share.header['bits'].target:
300                 submit_block(share.as_block(tracker), ignore_failure=True)
301                 print
302                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
303                 print
304                 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
305         
306         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
307         
308         @defer.inlineCallbacks
309         def parse(x):
310             if ':' in x:
311                 ip, port = x.split(':')
312                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
313             else:
314                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
315         
316         addrs = {}
317         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
318             try:
319                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
320             except:
321                 print >>sys.stderr, "error reading addrs"
322         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
323             try:
324                 addr = yield addr_df
325                 if addr not in addrs:
326                     addrs[addr] = (0, time.time(), time.time())
327             except:
328                 log.err()
329         
330         connect_addrs = set()
331         for addr_df in map(parse, args.p2pool_nodes):
332             try:
333                 connect_addrs.add((yield addr_df))
334             except:
335                 log.err()
336         
337         p2p_node = Node(
338             best_share_hash_func=lambda: current_work.value['best_share_hash'],
339             port=args.p2pool_port,
340             net=net,
341             addr_store=addrs,
342             connect_addrs=connect_addrs,
343         )
344         p2p_node.start()
345         
346         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
347         
348         # send share when the chain changes to their chain
349         def work_changed(new_work):
350             #print 'Work changed:', new_work
351             shares = []
352             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
353                 if share.hash in shared_share_hashes:
354                     break
355                 shared_share_hashes.add(share.hash)
356                 shares.append(share)
357             
358             for peer in p2p_node.peers.itervalues():
359                 peer.sendShares([share for share in shares if share.peer is not peer])
360         
361         current_work.changed.watch(work_changed)
362         
363         def save_shares():
364             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
365                 ss.add_share(share)
366                 if share.hash in tracker.verified.shares:
367                     ss.add_verified_hash(share.hash)
368         task.LoopingCall(save_shares).start(60)
369         
370         print '    ...success!'
371         print
372         
373         if args.upnp:
374             @defer.inlineCallbacks
375             def upnp_thread():
376                 while True:
377                     try:
378                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
379                         if is_lan:
380                             pm = yield portmapper.get_port_mapper()
381                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
382                     except defer.TimeoutError:
383                         pass
384                     except:
385                         if p2pool.DEBUG:
386                             log.err(None, 'UPnP error:')
387                     yield deferral.sleep(random.expovariate(1/120))
388             upnp_thread()
389         
390         # start listening for workers with a JSON-RPC server
391         
392         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
393         
394         if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
395             with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
396                 vip_pass = f.read().strip('\r\n')
397         else:
398             vip_pass = '%016x' % (random.randrange(2**64),)
399             with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
400                 f.write(vip_pass)
401         print '    Worker password:', vip_pass, '(only required for generating graphs)'
402         
403         # setup worker logic
404         
405         removed_unstales_var = variable.Variable((0, 0, 0))
406         removed_doa_unstales_var = variable.Variable(0)
407         @tracker.verified.removed.watch
408         def _(share):
409             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
410                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
411                 removed_unstales_var.set((
412                     removed_unstales_var.value[0] + 1,
413                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
414                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
415                 ))
416             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
417                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
418         
419         def get_stale_counts():
420             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
421             my_shares = len(my_share_hashes)
422             my_doa_shares = len(my_doa_share_hashes)
423             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
424             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
425             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
426             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
427             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
428             
429             my_shares_not_in_chain = my_shares - my_shares_in_chain
430             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
431             
432             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
433         
434         
435         pseudoshare_received = variable.Event()
436         local_rate_monitor = math.RateMonitor(10*60)
437         
438         class WorkerBridge(worker_interface.WorkerBridge):
439             def __init__(self):
440                 worker_interface.WorkerBridge.__init__(self)
441                 self.new_work_event = current_work.changed
442                 self.recent_shares_ts_work = []
443             
444             def preprocess_request(self, request):
445                 user = request.getUser() if request.getUser() is not None else ''
446                 
447                 desired_pseudoshare_target = None
448                 if '+' in user:
449                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
450                     try:
451                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
452                     except:
453                         pass
454                 
455                 desired_share_target = 2**256 - 1
456                 if '/' in user:
457                     user, min_diff_str = user.rsplit('/', 1)
458                     try:
459                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
460                     except:
461                         pass
462                 
463                 if random.uniform(0, 100) < args.worker_fee:
464                     pubkey_hash = my_pubkey_hash
465                 else:
466                     try:
467                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
468                     except: # XXX blah
469                         pubkey_hash = my_pubkey_hash
470                 
471                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
472             
473             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
474                 if len(p2p_node.peers) == 0 and net.PERSIST:
475                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
476                 if current_work.value['best_share_hash'] is None and net.PERSIST:
477                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
478                 if time.time() > current_work2.value['last_update'] + 60:
479                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
480                 
481                 if current_work.value['mm_chains']:
482                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
483                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
484                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
485                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
486                         size=size,
487                         nonce=0,
488                     ))
489                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
490                 else:
491                     mm_data = ''
492                     mm_later = []
493                 
494                 share_info, generate_tx = p2pool_data.generate_transaction(
495                     tracker=tracker,
496                     share_data=dict(
497                         previous_share_hash=current_work.value['best_share_hash'],
498                         coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
499                         nonce=random.randrange(2**32),
500                         pubkey_hash=pubkey_hash,
501                         subsidy=current_work2.value['subsidy'],
502                         donation=math.perfect_round(65535*args.donation_percentage/100),
503                         stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
504                             253 if orphans > orphans_recorded_in_chain else
505                             254 if doas > doas_recorded_in_chain else
506                             0
507                         )(*get_stale_counts()),
508                     ),
509                     block_target=current_work.value['bits'].target,
510                     desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
511                     desired_target=desired_share_target,
512                     net=net,
513                 )
514                 
515                 target = net.PARENT.SANE_MAX_TARGET
516                 if desired_pseudoshare_target is None:
517                     if len(self.recent_shares_ts_work) == 50:
518                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
519                         target = min(target, 2**256//hash_rate)
520                 else:
521                     target = min(target, desired_pseudoshare_target)
522                 target = max(target, share_info['bits'].target)
523                 for aux_work in current_work.value['mm_chains'].itervalues():
524                     target = max(target, aux_work['target'])
525                 
526                 transactions = [generate_tx] + list(current_work2.value['transactions'])
527                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
528                 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
529                 
530                 getwork_time = time.time()
531                 merkle_branch = current_work2.value['merkle_branch']
532                 
533                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
534                     bitcoin_data.target_to_difficulty(target),
535                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
536                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
537                     len(current_work2.value['transactions']),
538                 )
539                 
540                 ba = bitcoin_getwork.BlockAttempt(
541                     version=current_work.value['version'],
542                     previous_block=current_work.value['previous_block'],
543                     merkle_root=merkle_root,
544                     timestamp=current_work2.value['time'],
545                     bits=current_work.value['bits'],
546                     share_target=target,
547                 )
548                 
549                 received_header_hashes = set()
550                 
551                 def got_response(header, request):
552                     assert header['merkle_root'] == merkle_root
553                     
554                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
555                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
556                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
557                     
558                     try:
559                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
560                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
561                             if pow_hash <= header['bits'].target:
562                                 print
563                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
564                                 print
565                                 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
566                     except:
567                         log.err(None, 'Error while processing potential block:')
568                     
569                     for aux_work, index, hashes in mm_later:
570                         try:
571                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
572                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
573                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
574                                     bitcoin_data.aux_pow_type.pack(dict(
575                                         merkle_tx=dict(
576                                             tx=transactions[0],
577                                             block_hash=header_hash,
578                                             merkle_branch=merkle_branch,
579                                             index=0,
580                                         ),
581                                         merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
582                                         index=index,
583                                         parent_block_header=header,
584                                     )).encode('hex'),
585                                 )
586                                 @df.addCallback
587                                 def _(result):
588                                     if result != (pow_hash <= aux_work['target']):
589                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
590                                     else:
591                                         print 'Merged block submittal result: %s' % (result,)
592                                 @df.addErrback
593                                 def _(err):
594                                     log.err(err, 'Error submitting merged block:')
595                         except:
596                             log.err(None, 'Error while processing merged mining POW:')
597                     
598                     if pow_hash <= share_info['bits'].target:
599                         min_header = dict(header);del min_header['merkle_root']
600                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
601                         share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
602                         
603                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
604                             request.getUser(),
605                             p2pool_data.format_hash(share.hash),
606                             p2pool_data.format_hash(share.previous_hash),
607                             time.time() - getwork_time,
608                             ' DEAD ON ARRIVAL' if not on_time else '',
609                         )
610                         my_share_hashes.add(share.hash)
611                         if not on_time:
612                             my_doa_share_hashes.add(share.hash)
613                         
614                         tracker.add(share)
615                         if not p2pool.DEBUG:
616                             tracker.verified.add(share)
617                         set_real_work2()
618                         
619                         try:
620                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
621                                 for peer in p2p_node.peers.itervalues():
622                                     peer.sendShares([share])
623                                 shared_share_hashes.add(share.hash)
624                         except:
625                             log.err(None, 'Error forwarding block solution:')
626                     
627                     if pow_hash > target:
628                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
629                         print '    Hash:   %56x' % (pow_hash,)
630                         print '    Target: %56x' % (target,)
631                     elif header_hash in received_header_hashes:
632                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
633                     else:
634                         received_header_hashes.add(header_hash)
635                         
636                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
637                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
638                         while len(self.recent_shares_ts_work) > 50:
639                             self.recent_shares_ts_work.pop(0)
640                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
641                     
642                     return on_time
643                 
644                 return ba, got_response
645         
646         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
647         
648         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
649         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
650         
651         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
652         
653         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
654             pass
655         
656         print '    ...success!'
657         print
658         
659         
660         @defer.inlineCallbacks
661         def work_poller():
662             while True:
663                 flag = factory.new_block.get_deferred()
664                 try:
665                     yield set_real_work1()
666                 except:
667                     log.err()
668                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
669         work_poller()
670         
671         
672         # done!
673         print 'Started successfully!'
674         print
675         
676         
677         if hasattr(signal, 'SIGALRM'):
678             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
679                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
680             ))
681             signal.siginterrupt(signal.SIGALRM, False)
682             task.LoopingCall(signal.alarm, 30).start(1)
683         
684         if args.irc_announce:
685             from twisted.words.protocols import irc
686             class IRCClient(irc.IRCClient):
687                 nickname = 'p2pool%02i' % (random.randrange(100),)
688                 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
689                 def lineReceived(self, line):
690                     print repr(line)
691                     irc.IRCClient.lineReceived(self, line)
692                 def signedOn(self):
693                     irc.IRCClient.signedOn(self)
694                     self.factory.resetDelay()
695                     self.join(self.channel)
696                     self.watch_id = tracker.verified.added.watch(self._new_share)
697                     self.announced_hashes = set()
698                     self.delayed_messages = {}
699                 def privmsg(self, user, channel, message):
700                     if channel == self.channel and message in self.delayed_messages:
701                         self.delayed_messages.pop(message).cancel()
702                 def _new_share(self, share):
703                     if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
704                         self.announced_hashes.add(share.header_hash)
705                         message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
706                         self.delayed_messages[message] = reactor.callLater(random.expovariate(1/60), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
707                 def connectionLost(self, reason):
708                     tracker.verified.added.unwatch(self.watch_id)
709                     print 'IRC connection lost:', reason.getErrorMessage()
710             class IRCClientFactory(protocol.ReconnectingClientFactory):
711                 protocol = IRCClient
712             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
713         
714         @defer.inlineCallbacks
715         def status_thread():
716             last_str = None
717             last_time = 0
718             while True:
719                 yield deferral.sleep(3)
720                 try:
721                     if time.time() > current_work2.value['last_update'] + 60:
722                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
723                     
724                     height = tracker.get_height(current_work.value['best_share_hash'])
725                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
726                         height,
727                         len(tracker.verified.shares),
728                         len(tracker.shares),
729                         len(p2p_node.peers),
730                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
731                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
732                     
733                     datums, dt = local_rate_monitor.get_datums_in_last()
734                     my_att_s = sum(datum['work']/dt for datum in datums)
735                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
736                         math.format(int(my_att_s)),
737                         math.format_dt(dt),
738                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
739                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
740                     )
741                     
742                     if height > 2:
743                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
744                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
745                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
746                         
747                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
748                             shares, stale_orphan_shares, stale_doa_shares,
749                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
750                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
751                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
752                         )
753                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
754                             math.format(int(real_att_s)),
755                             100*stale_prop,
756                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
757                         )
758                     
759                     if this_str != last_str or time.time() > last_time + 15:
760                         print this_str
761                         last_str = this_str
762                         last_time = time.time()
763                 except:
764                     log.err()
765         status_thread()
766     except:
767         log.err(None, 'Fatal error:')
768         reactor.stop()
769
770 def run():
771     class FixedArgumentParser(argparse.ArgumentParser):
772         def _read_args_from_files(self, arg_strings):
773             # expand arguments referencing files
774             new_arg_strings = []
775             for arg_string in arg_strings:
776                 
777                 # for regular arguments, just add them back into the list
778                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
779                     new_arg_strings.append(arg_string)
780                 
781                 # replace arguments referencing files with the file content
782                 else:
783                     try:
784                         args_file = open(arg_string[1:])
785                         try:
786                             arg_strings = []
787                             for arg_line in args_file.read().splitlines():
788                                 for arg in self.convert_arg_line_to_args(arg_line):
789                                     arg_strings.append(arg)
790                             arg_strings = self._read_args_from_files(arg_strings)
791                             new_arg_strings.extend(arg_strings)
792                         finally:
793                             args_file.close()
794                     except IOError:
795                         err = sys.exc_info()[1]
796                         self.error(str(err))
797             
798             # return the modified argument list
799             return new_arg_strings
800         
801         def convert_arg_line_to_args(self, arg_line):
802             return [arg for arg in arg_line.split() if arg.strip()]
803     
804     
805     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
806     
807     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
808     parser.add_argument('--version', action='version', version=p2pool.__version__)
809     parser.add_argument('--net',
810         help='use specified network (default: bitcoin)',
811         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
812     parser.add_argument('--testnet',
813         help='''use the network's testnet''',
814         action='store_const', const=True, default=False, dest='testnet')
815     parser.add_argument('--debug',
816         help='enable debugging mode',
817         action='store_const', const=True, default=False, dest='debug')
818     parser.add_argument('-a', '--address',
819         help='generate payouts to this address (default: <address requested from bitcoind>)',
820         type=str, action='store', default=None, dest='address')
821     parser.add_argument('--datadir',
822         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
823         type=str, action='store', default=None, dest='datadir')
824     parser.add_argument('--logfile',
825         help='''log to this file (default: data/<NET>/log)''',
826         type=str, action='store', default=None, dest='logfile')
827     parser.add_argument('--merged',
828         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
829         type=str, action='append', default=[], dest='merged_urls')
830     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
831         help='donate this percentage of work to author of p2pool (default: 0.5)',
832         type=float, action='store', default=0.5, dest='donation_percentage')
833     parser.add_argument('--irc-announce',
834         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
835         action='store_true', default=False, dest='irc_announce')
836     
837     p2pool_group = parser.add_argument_group('p2pool interface')
838     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
839         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
840         type=int, action='store', default=None, dest='p2pool_port')
841     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
842         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
843         type=str, action='append', default=[], dest='p2pool_nodes')
844     parser.add_argument('--disable-upnp',
845         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
846         action='store_false', default=True, dest='upnp')
847     
848     worker_group = parser.add_argument_group('worker interface')
849     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
850         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
851         type=str, action='store', default=None, dest='worker_endpoint')
852     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
853         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
854         type=float, action='store', default=0, dest='worker_fee')
855     
856     bitcoind_group = parser.add_argument_group('bitcoind interface')
857     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
858         help='connect to this address (default: 127.0.0.1)',
859         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
860     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
861         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
862         type=int, action='store', default=None, dest='bitcoind_rpc_port')
863     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
864         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
865         type=int, action='store', default=None, dest='bitcoind_p2p_port')
866     
867     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
868         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
869         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
870     
871     args = parser.parse_args()
872     
873     if args.debug:
874         p2pool.DEBUG = True
875     
876     net_name = args.net_name + ('_testnet' if args.testnet else '')
877     net = networks.nets[net_name]
878     
879     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
880     if not os.path.exists(datadir_path):
881         os.makedirs(datadir_path)
882     
883     if len(args.bitcoind_rpc_userpass) > 2:
884         parser.error('a maximum of two arguments are allowed')
885     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
886     
887     if args.bitcoind_rpc_password is None:
888         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
889             parser.error('This network has no configuration file function. Manually enter your RPC password.')
890         conf_path = net.PARENT.CONF_FILE_FUNC()
891         if not os.path.exists(conf_path):
892             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
893                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
894                 '''\r\n'''
895                 '''server=1\r\n'''
896                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
897         with open(conf_path, 'rb') as f:
898             cp = ConfigParser.RawConfigParser()
899             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
900             for conf_name, var_name, var_type in [
901                 ('rpcuser', 'bitcoind_rpc_username', str),
902                 ('rpcpassword', 'bitcoind_rpc_password', str),
903                 ('rpcport', 'bitcoind_rpc_port', int),
904                 ('port', 'bitcoind_p2p_port', int),
905             ]:
906                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
907                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
908         if args.bitcoind_rpc_password is None:
909             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
910     
911     if args.bitcoind_rpc_username is None:
912         args.bitcoind_rpc_username = ''
913     
914     if args.bitcoind_rpc_port is None:
915         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
916     
917     if args.bitcoind_p2p_port is None:
918         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
919     
920     if args.p2pool_port is None:
921         args.p2pool_port = net.P2P_PORT
922     
923     if args.worker_endpoint is None:
924         worker_endpoint = '', net.WORKER_PORT
925     elif ':' not in args.worker_endpoint:
926         worker_endpoint = '', int(args.worker_endpoint)
927     else:
928         addr, port = args.worker_endpoint.rsplit(':', 1)
929         worker_endpoint = addr, int(port)
930     
931     if args.address is not None:
932         try:
933             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
934         except Exception, e:
935             parser.error('error parsing address: ' + repr(e))
936     else:
937         args.pubkey_hash = None
938     
939     def separate_url(url):
940         s = urlparse.urlsplit(url)
941         if '@' not in s.netloc:
942             parser.error('merged url netloc must contain an "@"')
943         userpass, new_netloc = s.netloc.rsplit('@', 1)
944         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
945     merged_urls = map(separate_url, args.merged_urls)
946     
947     if args.logfile is None:
948         args.logfile = os.path.join(datadir_path, 'log')
949     
950     logfile = logging.LogFile(args.logfile)
951     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
952     sys.stdout = logging.AbortPipe(pipe)
953     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
954     if hasattr(signal, "SIGUSR1"):
955         def sigusr1(signum, frame):
956             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
957             logfile.reopen()
958             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
959         signal.signal(signal.SIGUSR1, sigusr1)
960     task.LoopingCall(logfile.reopen).start(5)
961     
962     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
963     reactor.run()