s/PORT/CONNS/
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import base64
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
19
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface, height_tracker
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
25
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29     try:
30         work = yield bitcoind.rpc_getmemorypool()
31     except jsonrpc.Error, e:
32         if e.code == -32601: # Method not found
33             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34             raise deferral.RetrySilentlyException()
35         raise
36     packed_transactions = [x.decode('hex') for x in work['transactions']]
37     defer.returnValue(dict(
38         version=work['version'],
39         previous_block_hash=int(work['previousblockhash'], 16),
40         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41         merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42         subsidy=work['coinbasevalue'],
43         time=work['time'],
44         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
46     ))
47
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
50     try:
51         print 'p2pool (version %s)' % (p2pool.__version__,)
52         print
53         
54         # connect to bitcoind over JSON-RPC and do initial getmemorypool
55         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)))
58         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
59         if not good:
60             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
61             return
62         temp_work = yield getwork(bitcoind)
63         print '    ...success!'
64         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
65         print
66         
67         # connect to bitcoind over bitcoin-p2p
68         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69         factory = bitcoin_p2p.ClientFactory(net.PARENT)
70         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71         yield factory.getProtocol() # waits until handshake is successful
72         print '    ...success!'
73         print
74         
75         print 'Determining payout address...'
76         if args.pubkey_hash is None:
77             address_path = os.path.join(datadir_path, 'cached_payout_address')
78             
79             if os.path.exists(address_path):
80                 with open(address_path, 'rb') as f:
81                     address = f.read().strip('\r\n')
82                 print '    Loaded cached address: %s...' % (address,)
83             else:
84                 address = None
85             
86             if address is not None:
87                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88                 if not res['isvalid'] or not res['ismine']:
89                     print '    Cached address is either invalid or not controlled by local bitcoind!'
90                     address = None
91             
92             if address is None:
93                 print '    Getting payout address from bitcoind...'
94                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
95             
96             with open(address_path, 'wb') as f:
97                 f.write(address)
98             
99             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
100         else:
101             my_pubkey_hash = args.pubkey_hash
102         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
103         print
104         
105         my_share_hashes = set()
106         my_doa_share_hashes = set()
107         
108         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109         shared_share_hashes = set()
110         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111         known_verified = set()
112         recent_blocks = []
113         print "Loading shares..."
114         for i, (mode, contents) in enumerate(ss.get_shares()):
115             if mode == 'share':
116                 if contents.hash in tracker.shares:
117                     continue
118                 shared_share_hashes.add(contents.hash)
119                 contents.time_seen = 0
120                 tracker.add(contents)
121                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122                     print "    %i" % (len(tracker.shares),)
123             elif mode == 'verified_hash':
124                 known_verified.add(contents)
125             else:
126                 raise AssertionError()
127         print "    ...inserting %i verified shares..." % (len(known_verified),)
128         for h in known_verified:
129             if h not in tracker.shares:
130                 ss.forget_verified_share(h)
131                 continue
132             tracker.verified.add(tracker.shares[h])
133         print "    ...done loading %i shares!" % (len(tracker.shares),)
134         print
135         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
138         
139         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
140         
141         pre_current_work = variable.Variable(None)
142         pre_merged_work = variable.Variable({})
143         # information affecting work that should trigger a long-polling update
144         current_work = variable.Variable(None)
145         # information affecting work that should not trigger a long-polling update
146         current_work2 = variable.Variable(None)
147         
148         requested = expiring_dict.ExpiringDict(300)
149         
150         print 'Initializing work...'
151         @defer.inlineCallbacks
152         def set_real_work1():
153             work = yield getwork(bitcoind)
154             current_work2.set(dict(
155                 time=work['time'],
156                 transactions=work['transactions'],
157                 merkle_branch=work['merkle_branch'],
158                 subsidy=work['subsidy'],
159                 clock_offset=time.time() - work['time'],
160                 last_update=time.time(),
161             )) # second set first because everything hooks on the first
162             pre_current_work.set(dict(
163                 version=work['version'],
164                 previous_block=work['previous_block_hash'],
165                 bits=work['bits'],
166                 coinbaseflags=work['coinbaseflags'],
167             ))
168         yield set_real_work1()
169         
170         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
171         
172         def set_real_work2():
173             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
174             
175             t = dict(pre_current_work.value)
176             t['best_share_hash'] = best
177             t['mm_chains'] = pre_merged_work.value
178             current_work.set(t)
179             
180             t = time.time()
181             for peer2, share_hash in desired:
182                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
183                     continue
184                 last_request_time, count = requested.get(share_hash, (None, 0))
185                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
186                     continue
187                 potential_peers = set()
188                 for head in tracker.tails[share_hash]:
189                     potential_peers.update(peer_heads.get(head, set()))
190                 potential_peers = [peer for peer in potential_peers if peer.connected2]
191                 if count == 0 and peer2 is not None and peer2.connected2:
192                     peer = peer2
193                 else:
194                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
195                     if peer is None:
196                         continue
197                 
198                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
199                 peer.send_getshares(
200                     hashes=[share_hash],
201                     parents=2000,
202                     stops=list(set(tracker.heads) | set(
203                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
204                     ))[:100],
205                 )
206                 requested[share_hash] = t, count + 1
207         pre_current_work.changed.watch(lambda _: set_real_work2())
208         pre_merged_work.changed.watch(lambda _: set_real_work2())
209         set_real_work2()
210         print '    ...success!'
211         print
212         
213         
214         @defer.inlineCallbacks
215         def set_merged_work(merged_url, merged_userpass):
216             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
217             while True:
218                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
219                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
220                     hash=int(auxblock['hash'], 16),
221                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
222                     merged_proxy=merged_proxy,
223                 )}))
224                 yield deferral.sleep(1)
225         for merged_url, merged_userpass in merged_urls:
226             set_merged_work(merged_url, merged_userpass)
227         
228         @pre_merged_work.changed.watch
229         def _(new_merged_work):
230             print 'Got new merged mining work!'
231         
232         # setup p2p logic and join p2pool network
233         
234         class Node(p2p.Node):
235             def handle_shares(self, shares, peer):
236                 if len(shares) > 5:
237                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
238                 
239                 new_count = 0
240                 for share in shares:
241                     if share.hash in tracker.shares:
242                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
243                         continue
244                     
245                     new_count += 1
246                     
247                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
248                     
249                     tracker.add(share)
250                 
251                 if shares and peer is not None:
252                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
253                 
254                 if new_count:
255                     set_real_work2()
256                 
257                 if len(shares) > 5:
258                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
259             
260             def handle_share_hashes(self, hashes, peer):
261                 t = time.time()
262                 get_hashes = []
263                 for share_hash in hashes:
264                     if share_hash in tracker.shares:
265                         continue
266                     last_request_time, count = requested.get(share_hash, (None, 0))
267                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
268                         continue
269                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
270                     get_hashes.append(share_hash)
271                     requested[share_hash] = t, count + 1
272                 
273                 if hashes and peer is not None:
274                     peer_heads.setdefault(hashes[0], set()).add(peer)
275                 if get_hashes:
276                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
277             
278             def handle_get_shares(self, hashes, parents, stops, peer):
279                 parents = min(parents, 1000//len(hashes))
280                 stops = set(stops)
281                 shares = []
282                 for share_hash in hashes:
283                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
284                         if share.hash in stops:
285                             break
286                         shares.append(share)
287                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
288                 peer.sendShares(shares)
289         
290         @deferral.retry('Error submitting block: (will retry)', 10, 10)
291         @defer.inlineCallbacks
292         def submit_block(block, ignore_failure):
293             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
294             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
295             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
296                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
297         
298         @tracker.verified.added.watch
299         def _(share):
300             if share.pow_hash <= share.header['bits'].target:
301                 submit_block(share.as_block(tracker), ignore_failure=True)
302                 print
303                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
304                 print
305                 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
306         
307         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
308         
309         @defer.inlineCallbacks
310         def parse(x):
311             if ':' in x:
312                 ip, port = x.split(':')
313                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
314             else:
315                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
316         
317         addrs = {}
318         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
319             try:
320                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
321             except:
322                 print >>sys.stderr, "error reading addrs"
323         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
324             try:
325                 addr = yield addr_df
326                 if addr not in addrs:
327                     addrs[addr] = (0, time.time(), time.time())
328             except:
329                 log.err()
330         
331         connect_addrs = set()
332         for addr_df in map(parse, args.p2pool_nodes):
333             try:
334                 connect_addrs.add((yield addr_df))
335             except:
336                 log.err()
337         
338         p2p_node = Node(
339             best_share_hash_func=lambda: current_work.value['best_share_hash'],
340             port=args.p2pool_port,
341             net=net,
342             addr_store=addrs,
343             connect_addrs=connect_addrs,
344             max_incoming_conns=args.p2pool_conns,
345         )
346         p2p_node.start()
347         
348         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
349         
350         # send share when the chain changes to their chain
351         def work_changed(new_work):
352             #print 'Work changed:', new_work
353             shares = []
354             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
355                 if share.hash in shared_share_hashes:
356                     break
357                 shared_share_hashes.add(share.hash)
358                 shares.append(share)
359             
360             for peer in p2p_node.peers.itervalues():
361                 peer.sendShares([share for share in shares if share.peer is not peer])
362         
363         current_work.changed.watch(work_changed)
364         
365         def save_shares():
366             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
367                 ss.add_share(share)
368                 if share.hash in tracker.verified.shares:
369                     ss.add_verified_hash(share.hash)
370         task.LoopingCall(save_shares).start(60)
371         
372         print '    ...success!'
373         print
374         
375         if args.upnp:
376             @defer.inlineCallbacks
377             def upnp_thread():
378                 while True:
379                     try:
380                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
381                         if is_lan:
382                             pm = yield portmapper.get_port_mapper()
383                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
384                     except defer.TimeoutError:
385                         pass
386                     except:
387                         if p2pool.DEBUG:
388                             log.err(None, 'UPnP error:')
389                     yield deferral.sleep(random.expovariate(1/120))
390             upnp_thread()
391         
392         # start listening for workers with a JSON-RPC server
393         
394         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
395         
396         if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
397             with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
398                 vip_pass = f.read().strip('\r\n')
399         else:
400             vip_pass = '%016x' % (random.randrange(2**64),)
401             with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
402                 f.write(vip_pass)
403         print '    Worker password:', vip_pass, '(only required for generating graphs)'
404         
405         # setup worker logic
406         
407         removed_unstales_var = variable.Variable((0, 0, 0))
408         removed_doa_unstales_var = variable.Variable(0)
409         @tracker.verified.removed.watch
410         def _(share):
411             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
412                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
413                 removed_unstales_var.set((
414                     removed_unstales_var.value[0] + 1,
415                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
416                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
417                 ))
418             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
419                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
420         
421         def get_stale_counts():
422             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
423             my_shares = len(my_share_hashes)
424             my_doa_shares = len(my_doa_share_hashes)
425             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
426             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
427             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
428             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
429             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
430             
431             my_shares_not_in_chain = my_shares - my_shares_in_chain
432             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
433             
434             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
435         
436         
437         pseudoshare_received = variable.Event()
438         local_rate_monitor = math.RateMonitor(10*60)
439         
440         class WorkerBridge(worker_interface.WorkerBridge):
441             def __init__(self):
442                 worker_interface.WorkerBridge.__init__(self)
443                 self.new_work_event = current_work.changed
444                 self.recent_shares_ts_work = []
445             
446             def preprocess_request(self, request):
447                 user = request.getUser() if request.getUser() is not None else ''
448                 
449                 desired_pseudoshare_target = None
450                 if '+' in user:
451                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
452                     try:
453                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
454                     except:
455                         pass
456                 
457                 desired_share_target = 2**256 - 1
458                 if '/' in user:
459                     user, min_diff_str = user.rsplit('/', 1)
460                     try:
461                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
462                     except:
463                         pass
464                 
465                 if random.uniform(0, 100) < args.worker_fee:
466                     pubkey_hash = my_pubkey_hash
467                 else:
468                     try:
469                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
470                     except: # XXX blah
471                         pubkey_hash = my_pubkey_hash
472                 
473                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
474             
475             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
476                 if len(p2p_node.peers) == 0 and net.PERSIST:
477                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
478                 if current_work.value['best_share_hash'] is None and net.PERSIST:
479                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
480                 if time.time() > current_work2.value['last_update'] + 60:
481                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
482                 
483                 if current_work.value['mm_chains']:
484                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
485                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
486                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
487                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
488                         size=size,
489                         nonce=0,
490                     ))
491                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
492                 else:
493                     mm_data = ''
494                     mm_later = []
495                 
496                 share_info, generate_tx = p2pool_data.Share.generate_transaction(
497                     tracker=tracker,
498                     share_data=dict(
499                         previous_share_hash=current_work.value['best_share_hash'],
500                         coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
501                         nonce=random.randrange(2**32),
502                         pubkey_hash=pubkey_hash,
503                         subsidy=current_work2.value['subsidy'],
504                         donation=math.perfect_round(65535*args.donation_percentage/100),
505                         stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
506                             253 if orphans > orphans_recorded_in_chain else
507                             254 if doas > doas_recorded_in_chain else
508                             0
509                         )(*get_stale_counts()),
510                     ),
511                     block_target=current_work.value['bits'].target,
512                     desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
513                     desired_target=desired_share_target,
514                     net=net,
515                 )
516                 
517                 target = net.PARENT.SANE_MAX_TARGET
518                 if desired_pseudoshare_target is None:
519                     if len(self.recent_shares_ts_work) == 50:
520                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
521                         target = min(target, 2**256//hash_rate)
522                 else:
523                     target = min(target, desired_pseudoshare_target)
524                 target = max(target, share_info['bits'].target)
525                 for aux_work in current_work.value['mm_chains'].itervalues():
526                     target = max(target, aux_work['target'])
527                 
528                 transactions = [generate_tx] + list(current_work2.value['transactions'])
529                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
530                 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
531                 
532                 getwork_time = time.time()
533                 merkle_branch = current_work2.value['merkle_branch']
534                 
535                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
536                     bitcoin_data.target_to_difficulty(target),
537                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
538                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
539                     len(current_work2.value['transactions']),
540                 )
541                 
542                 ba = bitcoin_getwork.BlockAttempt(
543                     version=current_work.value['version'],
544                     previous_block=current_work.value['previous_block'],
545                     merkle_root=merkle_root,
546                     timestamp=current_work2.value['time'],
547                     bits=current_work.value['bits'],
548                     share_target=target,
549                 )
550                 
551                 received_header_hashes = set()
552                 
553                 def got_response(header, request):
554                     assert header['merkle_root'] == merkle_root
555                     
556                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
557                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
558                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
559                     
560                     try:
561                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
562                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
563                             if pow_hash <= header['bits'].target:
564                                 print
565                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
566                                 print
567                                 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
568                     except:
569                         log.err(None, 'Error while processing potential block:')
570                     
571                     for aux_work, index, hashes in mm_later:
572                         try:
573                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
574                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
575                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
576                                     bitcoin_data.aux_pow_type.pack(dict(
577                                         merkle_tx=dict(
578                                             tx=transactions[0],
579                                             block_hash=header_hash,
580                                             merkle_branch=merkle_branch,
581                                             index=0,
582                                         ),
583                                         merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
584                                         index=index,
585                                         parent_block_header=header,
586                                     )).encode('hex'),
587                                 )
588                                 @df.addCallback
589                                 def _(result):
590                                     if result != (pow_hash <= aux_work['target']):
591                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
592                                     else:
593                                         print 'Merged block submittal result: %s' % (result,)
594                                 @df.addErrback
595                                 def _(err):
596                                     log.err(err, 'Error submitting merged block:')
597                         except:
598                             log.err(None, 'Error while processing merged mining POW:')
599                     
600                     if pow_hash <= share_info['bits'].target:
601                         min_header = dict(header);del min_header['merkle_root']
602                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
603                         share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
604                         
605                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
606                             request.getUser(),
607                             p2pool_data.format_hash(share.hash),
608                             p2pool_data.format_hash(share.previous_hash),
609                             time.time() - getwork_time,
610                             ' DEAD ON ARRIVAL' if not on_time else '',
611                         )
612                         my_share_hashes.add(share.hash)
613                         if not on_time:
614                             my_doa_share_hashes.add(share.hash)
615                         
616                         tracker.add(share)
617                         if not p2pool.DEBUG:
618                             tracker.verified.add(share)
619                         set_real_work2()
620                         
621                         try:
622                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
623                                 for peer in p2p_node.peers.itervalues():
624                                     peer.sendShares([share])
625                                 shared_share_hashes.add(share.hash)
626                         except:
627                             log.err(None, 'Error forwarding block solution:')
628                     
629                     if pow_hash > target:
630                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
631                         print '    Hash:   %56x' % (pow_hash,)
632                         print '    Target: %56x' % (target,)
633                     elif header_hash in received_header_hashes:
634                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
635                     else:
636                         received_header_hashes.add(header_hash)
637                         
638                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
639                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
640                         while len(self.recent_shares_ts_work) > 50:
641                             self.recent_shares_ts_work.pop(0)
642                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
643                     
644                     return on_time
645                 
646                 return ba, got_response
647         
648         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
649         
650         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
651         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
652         
653         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
654         
655         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
656             pass
657         
658         print '    ...success!'
659         print
660         
661         
662         @defer.inlineCallbacks
663         def work_poller():
664             while True:
665                 flag = factory.new_block.get_deferred()
666                 try:
667                     yield set_real_work1()
668                 except:
669                     log.err()
670                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
671         work_poller()
672         
673         
674         # done!
675         print 'Started successfully!'
676         print
677         
678         
679         if hasattr(signal, 'SIGALRM'):
680             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
681                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
682             ))
683             signal.siginterrupt(signal.SIGALRM, False)
684             task.LoopingCall(signal.alarm, 30).start(1)
685         
686         if args.irc_announce:
687             from twisted.words.protocols import irc
688             class IRCClient(irc.IRCClient):
689                 nickname = 'p2pool%02i' % (random.randrange(100),)
690                 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
691                 def lineReceived(self, line):
692                     print repr(line)
693                     irc.IRCClient.lineReceived(self, line)
694                 def signedOn(self):
695                     irc.IRCClient.signedOn(self)
696                     self.factory.resetDelay()
697                     self.join(self.channel)
698                     self.watch_id = tracker.verified.added.watch(self._new_share)
699                     self.announced_hashes = set()
700                     self.delayed_messages = {}
701                 def privmsg(self, user, channel, message):
702                     if channel == self.channel and message in self.delayed_messages:
703                         self.delayed_messages.pop(message).cancel()
704                 def _new_share(self, share):
705                     if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
706                         self.announced_hashes.add(share.header_hash)
707                         message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
708                         self.delayed_messages[message] = reactor.callLater(random.expovariate(1/60), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
709                 def connectionLost(self, reason):
710                     tracker.verified.added.unwatch(self.watch_id)
711                     print 'IRC connection lost:', reason.getErrorMessage()
712             class IRCClientFactory(protocol.ReconnectingClientFactory):
713                 protocol = IRCClient
714             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
715         
716         @defer.inlineCallbacks
717         def status_thread():
718             last_str = None
719             last_time = 0
720             while True:
721                 yield deferral.sleep(3)
722                 try:
723                     if time.time() > current_work2.value['last_update'] + 60:
724                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
725                     
726                     height = tracker.get_height(current_work.value['best_share_hash'])
727                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
728                         height,
729                         len(tracker.verified.shares),
730                         len(tracker.shares),
731                         len(p2p_node.peers),
732                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
733                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
734                     
735                     datums, dt = local_rate_monitor.get_datums_in_last()
736                     my_att_s = sum(datum['work']/dt for datum in datums)
737                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
738                         math.format(int(my_att_s)),
739                         math.format_dt(dt),
740                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
741                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
742                     )
743                     
744                     if height > 2:
745                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
746                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
747                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
748                         
749                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
750                             shares, stale_orphan_shares, stale_doa_shares,
751                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
752                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
753                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
754                         )
755                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
756                             math.format(int(real_att_s)),
757                             100*stale_prop,
758                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
759                         )
760                     
761                     if this_str != last_str or time.time() > last_time + 15:
762                         print this_str
763                         last_str = this_str
764                         last_time = time.time()
765                 except:
766                     log.err()
767         status_thread()
768     except:
769         log.err(None, 'Fatal error:')
770         reactor.stop()
771
772 def run():
773     class FixedArgumentParser(argparse.ArgumentParser):
774         def _read_args_from_files(self, arg_strings):
775             # expand arguments referencing files
776             new_arg_strings = []
777             for arg_string in arg_strings:
778                 
779                 # for regular arguments, just add them back into the list
780                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
781                     new_arg_strings.append(arg_string)
782                 
783                 # replace arguments referencing files with the file content
784                 else:
785                     try:
786                         args_file = open(arg_string[1:])
787                         try:
788                             arg_strings = []
789                             for arg_line in args_file.read().splitlines():
790                                 for arg in self.convert_arg_line_to_args(arg_line):
791                                     arg_strings.append(arg)
792                             arg_strings = self._read_args_from_files(arg_strings)
793                             new_arg_strings.extend(arg_strings)
794                         finally:
795                             args_file.close()
796                     except IOError:
797                         err = sys.exc_info()[1]
798                         self.error(str(err))
799             
800             # return the modified argument list
801             return new_arg_strings
802         
803         def convert_arg_line_to_args(self, arg_line):
804             return [arg for arg in arg_line.split() if arg.strip()]
805     
806     
807     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
808     
809     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
810     parser.add_argument('--version', action='version', version=p2pool.__version__)
811     parser.add_argument('--net',
812         help='use specified network (default: bitcoin)',
813         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
814     parser.add_argument('--testnet',
815         help='''use the network's testnet''',
816         action='store_const', const=True, default=False, dest='testnet')
817     parser.add_argument('--debug',
818         help='enable debugging mode',
819         action='store_const', const=True, default=False, dest='debug')
820     parser.add_argument('-a', '--address',
821         help='generate payouts to this address (default: <address requested from bitcoind>)',
822         type=str, action='store', default=None, dest='address')
823     parser.add_argument('--datadir',
824         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
825         type=str, action='store', default=None, dest='datadir')
826     parser.add_argument('--logfile',
827         help='''log to this file (default: data/<NET>/log)''',
828         type=str, action='store', default=None, dest='logfile')
829     parser.add_argument('--merged',
830         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
831         type=str, action='append', default=[], dest='merged_urls')
832     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
833         help='donate this percentage of work to author of p2pool (default: 0.5)',
834         type=float, action='store', default=0.5, dest='donation_percentage')
835     parser.add_argument('--irc-announce',
836         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
837         action='store_true', default=False, dest='irc_announce')
838     
839     p2pool_group = parser.add_argument_group('p2pool interface')
840     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
841         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
842         type=int, action='store', default=None, dest='p2pool_port')
843     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
844         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
845         type=str, action='append', default=[], dest='p2pool_nodes')
846     parser.add_argument('--disable-upnp',
847         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
848         action='store_false', default=True, dest='upnp')
849     p2pool_group.add_argument('--max-conns', metavar='CONNS',
850         help='maximum incoming connections (default: 40)',
851         type=int, action='store', default=40, dest='p2pool_conns')
852     
853     worker_group = parser.add_argument_group('worker interface')
854     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
855         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
856         type=str, action='store', default=None, dest='worker_endpoint')
857     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
858         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
859         type=float, action='store', default=0, dest='worker_fee')
860     
861     bitcoind_group = parser.add_argument_group('bitcoind interface')
862     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
863         help='connect to this address (default: 127.0.0.1)',
864         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
865     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
866         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
867         type=int, action='store', default=None, dest='bitcoind_rpc_port')
868     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
869         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
870         type=int, action='store', default=None, dest='bitcoind_p2p_port')
871     
872     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
873         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
874         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
875     
876     args = parser.parse_args()
877     
878     if args.debug:
879         p2pool.DEBUG = True
880     
881     net_name = args.net_name + ('_testnet' if args.testnet else '')
882     net = networks.nets[net_name]
883     
884     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
885     if not os.path.exists(datadir_path):
886         os.makedirs(datadir_path)
887     
888     if len(args.bitcoind_rpc_userpass) > 2:
889         parser.error('a maximum of two arguments are allowed')
890     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
891     
892     if args.bitcoind_rpc_password is None:
893         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
894             parser.error('This network has no configuration file function. Manually enter your RPC password.')
895         conf_path = net.PARENT.CONF_FILE_FUNC()
896         if not os.path.exists(conf_path):
897             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
898                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
899                 '''\r\n'''
900                 '''server=1\r\n'''
901                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
902         with open(conf_path, 'rb') as f:
903             cp = ConfigParser.RawConfigParser()
904             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
905             for conf_name, var_name, var_type in [
906                 ('rpcuser', 'bitcoind_rpc_username', str),
907                 ('rpcpassword', 'bitcoind_rpc_password', str),
908                 ('rpcport', 'bitcoind_rpc_port', int),
909                 ('port', 'bitcoind_p2p_port', int),
910             ]:
911                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
912                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
913         if args.bitcoind_rpc_password is None:
914             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
915     
916     if args.bitcoind_rpc_username is None:
917         args.bitcoind_rpc_username = ''
918     
919     if args.bitcoind_rpc_port is None:
920         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
921     
922     if args.bitcoind_p2p_port is None:
923         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
924     
925     if args.p2pool_port is None:
926         args.p2pool_port = net.P2P_PORT
927     
928     if args.worker_endpoint is None:
929         worker_endpoint = '', net.WORKER_PORT
930     elif ':' not in args.worker_endpoint:
931         worker_endpoint = '', int(args.worker_endpoint)
932     else:
933         addr, port = args.worker_endpoint.rsplit(':', 1)
934         worker_endpoint = addr, int(port)
935     
936     if args.address is not None:
937         try:
938             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
939         except Exception, e:
940             parser.error('error parsing address: ' + repr(e))
941     else:
942         args.pubkey_hash = None
943     
944     def separate_url(url):
945         s = urlparse.urlsplit(url)
946         if '@' not in s.netloc:
947             parser.error('merged url netloc must contain an "@"')
948         userpass, new_netloc = s.netloc.rsplit('@', 1)
949         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
950     merged_urls = map(separate_url, args.merged_urls)
951     
952     if args.logfile is None:
953         args.logfile = os.path.join(datadir_path, 'log')
954     
955     logfile = logging.LogFile(args.logfile)
956     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
957     sys.stdout = logging.AbortPipe(pipe)
958     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
959     if hasattr(signal, "SIGUSR1"):
960         def sigusr1(signum, frame):
961             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
962             logfile.reopen()
963             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
964         signal.signal(signal.SIGUSR1, sigusr1)
965     task.LoopingCall(logfile.reopen).start(5)
966     
967     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
968     reactor.run()