cleaned up address saving
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import os
7 import random
8 import struct
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
19
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
25
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29     try:
30         work = yield bitcoind.rpc_getmemorypool()
31     except jsonrpc.Error, e:
32         if e.code == -32601: # Method not found
33             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34             raise deferral.RetrySilentlyException()
35         raise
36     packed_transactions = [x.decode('hex') for x in work['transactions']]
37     defer.returnValue(dict(
38         version=work['version'],
39         previous_block_hash=int(work['previousblockhash'], 16),
40         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41         merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42         subsidy=work['coinbasevalue'],
43         time=work['time'],
44         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
46     ))
47
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
50     try:
51         print 'p2pool (version %s)' % (p2pool.__version__,)
52         print
53         
54         # connect to bitcoind over JSON-RPC and do initial getmemorypool
55         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
59         if not good:
60             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
61             return
62         temp_work = yield getwork(bitcoind)
63         print '    ...success!'
64         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
65         print
66         
67         # connect to bitcoind over bitcoin-p2p
68         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69         factory = bitcoin_p2p.ClientFactory(net.PARENT)
70         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71         yield factory.getProtocol() # waits until handshake is successful
72         print '    ...success!'
73         print
74         
75         print 'Determining payout address...'
76         if args.pubkey_hash is None:
77             address_path = os.path.join(datadir_path, 'cached_payout_address')
78             
79             if os.path.exists(address_path):
80                 with open(address_path, 'rb') as f:
81                     address = f.read().strip('\r\n')
82                 print '    Loaded cached address: %s...' % (address,)
83             else:
84                 address = None
85             
86             if address is not None:
87                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88                 if not res['isvalid'] or not res['ismine']:
89                     print '    Cached address is either invalid or not controlled by local bitcoind!'
90                     address = None
91             
92             if address is None:
93                 print '    Getting payout address from bitcoind...'
94                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
95             
96             with open(address_path, 'wb') as f:
97                 f.write(address)
98             
99             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
100         else:
101             my_pubkey_hash = args.pubkey_hash
102         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
103         print
104         
105         my_share_hashes = set()
106         my_doa_share_hashes = set()
107         
108         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109         shared_share_hashes = set()
110         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111         known_verified = set()
112         recent_blocks = []
113         print "Loading shares..."
114         for i, (mode, contents) in enumerate(ss.get_shares()):
115             if mode == 'share':
116                 if contents.hash in tracker.shares:
117                     continue
118                 shared_share_hashes.add(contents.hash)
119                 contents.time_seen = 0
120                 tracker.add(contents)
121                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122                     print "    %i" % (len(tracker.shares),)
123             elif mode == 'verified_hash':
124                 known_verified.add(contents)
125             else:
126                 raise AssertionError()
127         print "    ...inserting %i verified shares..." % (len(known_verified),)
128         for h in known_verified:
129             if h not in tracker.shares:
130                 ss.forget_verified_share(h)
131                 continue
132             tracker.verified.add(tracker.shares[h])
133         print "    ...done loading %i shares!" % (len(tracker.shares),)
134         print
135         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
138         
139         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
140         
141         pre_current_work = variable.Variable(None)
142         pre_merged_work = variable.Variable({})
143         # information affecting work that should trigger a long-polling update
144         current_work = variable.Variable(None)
145         # information affecting work that should not trigger a long-polling update
146         current_work2 = variable.Variable(None)
147         
148         requested = expiring_dict.ExpiringDict(300)
149         
150         print 'Initializing work...'
151         @defer.inlineCallbacks
152         def set_real_work1():
153             work = yield getwork(bitcoind)
154             current_work2.set(dict(
155                 time=work['time'],
156                 transactions=work['transactions'],
157                 merkle_branch=work['merkle_branch'],
158                 subsidy=work['subsidy'],
159                 clock_offset=time.time() - work['time'],
160                 last_update=time.time(),
161             )) # second set first because everything hooks on the first
162             pre_current_work.set(dict(
163                 version=work['version'],
164                 previous_block=work['previous_block_hash'],
165                 bits=work['bits'],
166                 coinbaseflags=work['coinbaseflags'],
167             ))
168         yield set_real_work1()
169         
170         if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
171             height_cacher = deferral.DeferredCacher(defer.inlineCallbacks(lambda block_hash: defer.returnValue((yield bitcoind.rpc_getblock('%x' % (block_hash,)))['blockcount'])))
172             best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
173             def get_height_rel_highest(block_hash):
174                 this_height = height_cacher.call_now(block_hash, 0)
175                 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
176                 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
177                 return this_height - best_height_cached.value
178         else:
179             get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
180         
181         def set_real_work2():
182             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
183             
184             t = dict(pre_current_work.value)
185             t['best_share_hash'] = best
186             t['mm_chains'] = pre_merged_work.value
187             current_work.set(t)
188             
189             t = time.time()
190             for peer2, share_hash in desired:
191                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
192                     continue
193                 last_request_time, count = requested.get(share_hash, (None, 0))
194                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
195                     continue
196                 potential_peers = set()
197                 for head in tracker.tails[share_hash]:
198                     potential_peers.update(peer_heads.get(head, set()))
199                 potential_peers = [peer for peer in potential_peers if peer.connected2]
200                 if count == 0 and peer2 is not None and peer2.connected2:
201                     peer = peer2
202                 else:
203                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
204                     if peer is None:
205                         continue
206                 
207                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
208                 peer.send_getshares(
209                     hashes=[share_hash],
210                     parents=2000,
211                     stops=list(set(tracker.heads) | set(
212                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
213                     ))[:100],
214                 )
215                 requested[share_hash] = t, count + 1
216         pre_current_work.changed.watch(lambda _: set_real_work2())
217         pre_merged_work.changed.watch(lambda _: set_real_work2())
218         set_real_work2()
219         print '    ...success!'
220         print
221         
222         
223         @defer.inlineCallbacks
224         def set_merged_work(merged_url, merged_userpass):
225             merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
226             while True:
227                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
228                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
229                     hash=int(auxblock['hash'], 16),
230                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
231                     merged_proxy=merged_proxy,
232                 )}))
233                 yield deferral.sleep(1)
234         for merged_url, merged_userpass in merged_urls:
235             set_merged_work(merged_url, merged_userpass)
236         
237         @pre_merged_work.changed.watch
238         def _(new_merged_work):
239             print 'Got new merged mining work!'
240         
241         # setup p2p logic and join p2pool network
242         
243         class Node(p2p.Node):
244             def handle_shares(self, shares, peer):
245                 if len(shares) > 5:
246                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
247                 
248                 new_count = 0
249                 for share in shares:
250                     if share.hash in tracker.shares:
251                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
252                         continue
253                     
254                     new_count += 1
255                     
256                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
257                     
258                     tracker.add(share)
259                 
260                 if shares and peer is not None:
261                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
262                 
263                 if new_count:
264                     set_real_work2()
265                 
266                 if len(shares) > 5:
267                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
268             
269             def handle_share_hashes(self, hashes, peer):
270                 t = time.time()
271                 get_hashes = []
272                 for share_hash in hashes:
273                     if share_hash in tracker.shares:
274                         continue
275                     last_request_time, count = requested.get(share_hash, (None, 0))
276                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
277                         continue
278                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
279                     get_hashes.append(share_hash)
280                     requested[share_hash] = t, count + 1
281                 
282                 if hashes and peer is not None:
283                     peer_heads.setdefault(hashes[0], set()).add(peer)
284                 if get_hashes:
285                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
286             
287             def handle_get_shares(self, hashes, parents, stops, peer):
288                 parents = min(parents, 1000//len(hashes))
289                 stops = set(stops)
290                 shares = []
291                 for share_hash in hashes:
292                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
293                         if share.hash in stops:
294                             break
295                         shares.append(share)
296                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
297                 peer.sendShares(shares)
298         
299         @tracker.verified.added.watch
300         def _(share):
301             if share.pow_hash <= share.header['bits'].target:
302                 if factory.conn.value is not None:
303                     factory.conn.value.send_block(block=share.as_block(tracker))
304                 else:
305                     print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
306                 print
307                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
308                 print
309                 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
310         
311         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
312         
313         @defer.inlineCallbacks
314         def parse(x):
315             if ':' in x:
316                 ip, port = x.split(':')
317                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
318             else:
319                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
320         
321         addrs = {}
322         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
323             try:
324                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
325             except:
326                 print >>sys.stderr, "error reading addrs"
327         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
328             try:
329                 addr = yield addr_df
330                 if addr not in addrs:
331                     addrs[addr] = (0, time.time(), time.time())
332             except:
333                 log.err()
334         
335         connect_addrs = set()
336         for addr_df in map(parse, args.p2pool_nodes):
337             try:
338                 connect_addrs.add((yield addr_df))
339             except:
340                 log.err()
341         
342         p2p_node = Node(
343             best_share_hash_func=lambda: current_work.value['best_share_hash'],
344             port=args.p2pool_port,
345             net=net,
346             addr_store=addrs,
347             connect_addrs=connect_addrs,
348         )
349         p2p_node.start()
350         
351         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
352         
353         # send share when the chain changes to their chain
354         def work_changed(new_work):
355             #print 'Work changed:', new_work
356             shares = []
357             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
358                 if share.hash in shared_share_hashes:
359                     break
360                 shared_share_hashes.add(share.hash)
361                 shares.append(share)
362             
363             for peer in p2p_node.peers.itervalues():
364                 peer.sendShares([share for share in shares if share.peer is not peer])
365         
366         current_work.changed.watch(work_changed)
367         
368         def save_shares():
369             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
370                 ss.add_share(share)
371                 if share.hash in tracker.verified.shares:
372                     ss.add_verified_hash(share.hash)
373         task.LoopingCall(save_shares).start(60)
374         
375         print '    ...success!'
376         print
377         
378         @defer.inlineCallbacks
379         def upnp_thread():
380             while True:
381                 try:
382                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
383                     if is_lan:
384                         pm = yield portmapper.get_port_mapper()
385                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
386                 except defer.TimeoutError:
387                     pass
388                 except:
389                     if p2pool.DEBUG:
390                         log.err(None, "UPnP error:")
391                 yield deferral.sleep(random.expovariate(1/120))
392         
393         if args.upnp:
394             upnp_thread()
395         
396         # start listening for workers with a JSON-RPC server
397         
398         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
399         
400         if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
401             with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
402                 vip_pass = f.read().strip('\r\n')
403         else:
404             vip_pass = '%016x' % (random.randrange(2**64),)
405             with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
406                 f.write(vip_pass)
407         print '    Worker password:', vip_pass, '(only required for generating graphs)'
408         
409         # setup worker logic
410         
411         removed_unstales_var = variable.Variable((0, 0, 0))
412         removed_doa_unstales_var = variable.Variable(0)
413         @tracker.verified.removed.watch
414         def _(share):
415             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
416                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
417                 removed_unstales_var.set((
418                     removed_unstales_var.value[0] + 1,
419                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
420                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
421                 ))
422             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
423                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
424         
425         def get_stale_counts():
426             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
427             my_shares = len(my_share_hashes)
428             my_doa_shares = len(my_doa_share_hashes)
429             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
430             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
431             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
432             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
433             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
434             
435             my_shares_not_in_chain = my_shares - my_shares_in_chain
436             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
437             
438             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
439         
440         
441         local_rate_monitor = math.RateMonitor(10*60)
442         
443         class WorkerBridge(worker_interface.WorkerBridge):
444             def __init__(self):
445                 worker_interface.WorkerBridge.__init__(self)
446                 self.new_work_event = current_work.changed
447                 self.recent_shares_ts_work = []
448             
449             def preprocess_request(self, request):
450                 user = request.getUser() if request.getUser() is not None else ''
451                 pubkey_hash = my_pubkey_hash
452                 max_target = 2**256 - 1
453                 if '/' in user:
454                     user, min_diff_str = user.rsplit('/', 1)
455                     try:
456                         max_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
457                     except:
458                         pass
459                 try:
460                     pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
461                 except: # XXX blah
462                     pass
463                 if random.uniform(0, 100) < args.worker_fee:
464                     pubkey_hash = my_pubkey_hash
465                 return pubkey_hash, max_target
466             
467             def get_work(self, pubkey_hash, max_target):
468                 if len(p2p_node.peers) == 0 and net.PERSIST:
469                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
470                 if current_work.value['best_share_hash'] is None and net.PERSIST:
471                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
472                 if time.time() > current_work2.value['last_update'] + 60:
473                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
474                 
475                 if current_work.value['mm_chains']:
476                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
477                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
478                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
479                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
480                         size=size,
481                         nonce=0,
482                     ))
483                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
484                 else:
485                     mm_data = ''
486                     mm_later = []
487                 
488                 new = time.time() > net.SWITCH_TIME
489                 
490                 if new:
491                     share_info, generate_tx = p2pool_data.new_generate_transaction(
492                         tracker=tracker,
493                         share_data=dict(
494                             previous_share_hash=current_work.value['best_share_hash'],
495                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
496                             nonce=random.randrange(2**32),
497                             pubkey_hash=pubkey_hash,
498                             subsidy=current_work2.value['subsidy'],
499                             donation=math.perfect_round(65535*args.donation_percentage/100),
500                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
501                                 253 if orphans > orphans_recorded_in_chain else
502                                 254 if doas > doas_recorded_in_chain else
503                                 0
504                             )(*get_stale_counts()),
505                         ),
506                         block_target=current_work.value['bits'].target,
507                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
508                         desired_target=max_target,
509                         net=net,
510                     )
511                 else:
512                     share_info, generate_tx = p2pool_data.generate_transaction(
513                         tracker=tracker,
514                         share_data=dict(
515                             previous_share_hash=current_work.value['best_share_hash'],
516                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
517                             nonce=struct.pack('<Q', random.randrange(2**64)),
518                             new_script=bitcoin_data.pubkey_hash_to_script2(pubkey_hash),
519                             subsidy=current_work2.value['subsidy'],
520                             donation=math.perfect_round(65535*args.donation_percentage/100),
521                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
522                                 253 if orphans > orphans_recorded_in_chain else
523                                 254 if doas > doas_recorded_in_chain else
524                                 0
525                             )(*get_stale_counts()),
526                         ),
527                         block_target=current_work.value['bits'].target,
528                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
529                         net=net,
530                     )
531                 
532                 target = net.PARENT.SANE_MAX_TARGET
533                 if len(self.recent_shares_ts_work) == 50:
534                     hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
535                     target = min(target, 2**256//(hash_rate * 5))
536                 target = max(target, share_info['bits'].target)
537                 for aux_work in current_work.value['mm_chains'].itervalues():
538                     target = max(target, aux_work['target'])
539                 
540                 transactions = [generate_tx] + list(current_work2.value['transactions'])
541                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
542                 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
543                 
544                 getwork_time = time.time()
545                 merkle_branch = current_work2.value['merkle_branch']
546                 
547                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
548                     bitcoin_data.target_to_difficulty(target),
549                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
550                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
551                     len(current_work2.value['transactions']),
552                 )
553                 
554                 ba = bitcoin_getwork.BlockAttempt(
555                     version=current_work.value['version'],
556                     previous_block=current_work.value['previous_block'],
557                     merkle_root=merkle_root,
558                     timestamp=current_work2.value['time'],
559                     bits=current_work.value['bits'],
560                     share_target=target,
561                 )
562                 
563                 received_header_hashes = set()
564                 
565                 def got_response(header, request):
566                     assert header['merkle_root'] == merkle_root
567                     
568                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
569                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
570                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
571                     
572                     try:
573                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
574                             @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
575                             def submit_block():
576                                 if factory.conn.value is None:
577                                     print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
578                                     raise deferral.RetrySilentlyException()
579                                 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
580                             submit_block()
581                             if pow_hash <= header['bits'].target:
582                                 print
583                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
584                                 print
585                                 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
586                     except:
587                         log.err(None, 'Error while processing potential block:')
588                     
589                     for aux_work, index, hashes in mm_later:
590                         try:
591                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
592                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
593                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
594                                     bitcoin_data.aux_pow_type.pack(dict(
595                                         merkle_tx=dict(
596                                             tx=transactions[0],
597                                             block_hash=header_hash,
598                                             merkle_branch=merkle_branch,
599                                             index=0,
600                                         ),
601                                         merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
602                                         index=index,
603                                         parent_block_header=header,
604                                     )).encode('hex'),
605                                 )
606                                 @df.addCallback
607                                 def _(result):
608                                     if result != (pow_hash <= aux_work['target']):
609                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
610                                     else:
611                                         print 'Merged block submittal result: %s' % (result,)
612                                 @df.addErrback
613                                 def _(err):
614                                     log.err(err, 'Error submitting merged block:')
615                         except:
616                             log.err(None, 'Error while processing merged mining POW:')
617                     
618                     if pow_hash <= share_info['bits'].target:
619                         if new:
620                             min_header = dict(header);del min_header['merkle_root']
621                             hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
622                             share = p2pool_data.NewShare(net, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
623                         else:
624                             share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
625                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
626                             request.getUser(),
627                             p2pool_data.format_hash(share.hash),
628                             p2pool_data.format_hash(share.previous_hash),
629                             time.time() - getwork_time,
630                             ' DEAD ON ARRIVAL' if not on_time else '',
631                         )
632                         my_share_hashes.add(share.hash)
633                         if not on_time:
634                             my_doa_share_hashes.add(share.hash)
635                         
636                         tracker.add(share)
637                         if not p2pool.DEBUG:
638                             tracker.verified.add(share)
639                         set_real_work2()
640                         
641                         try:
642                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
643                                 for peer in p2p_node.peers.itervalues():
644                                     peer.sendShares([share])
645                                 shared_share_hashes.add(share.hash)
646                         except:
647                             log.err(None, 'Error forwarding block solution:')
648                     
649                     if pow_hash <= target and header_hash not in received_header_hashes:
650                         reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
651                         if request.getPassword() == vip_pass:
652                             reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
653                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
654                         while len(self.recent_shares_ts_work) > 50:
655                             self.recent_shares_ts_work.pop(0)
656                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
657                     
658                     if header_hash in received_header_hashes:
659                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
660                     received_header_hashes.add(header_hash)
661                     
662                     if pow_hash > target:
663                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
664                         print '    Hash:   %56x' % (pow_hash,)
665                         print '    Target: %56x' % (target,)
666                     
667                     return on_time
668                 
669                 return ba, got_response
670         
671         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
672         
673         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks)
674         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
675         
676         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
677         
678         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
679             pass
680         
681         print '    ...success!'
682         print
683         
684         
685         @defer.inlineCallbacks
686         def work_poller():
687             while True:
688                 flag = factory.new_block.get_deferred()
689                 try:
690                     yield set_real_work1()
691                 except:
692                     log.err()
693                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
694         work_poller()
695         
696         
697         # done!
698         print 'Started successfully!'
699         print
700         
701         
702         if hasattr(signal, 'SIGALRM'):
703             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
704                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
705             ))
706             signal.siginterrupt(signal.SIGALRM, False)
707             task.LoopingCall(signal.alarm, 30).start(1)
708         
709         if args.irc_announce:
710             from twisted.words.protocols import irc
711             class IRCClient(irc.IRCClient):
712                 nickname = 'p2pool%02i' % (random.randrange(100),)
713                 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
714                 def lineReceived(self, line):
715                     print repr(line)
716                     irc.IRCClient.lineReceived(self, line)
717                 def signedOn(self):
718                     irc.IRCClient.signedOn(self)
719                     self.factory.resetDelay()
720                     self.join(self.channel)
721                     self.watch_id = tracker.verified.added.watch(self._new_share)
722                     self.announced_hashes = set()
723                     self.delayed_messages = {}
724                 def privmsg(self, user, channel, message):
725                     if channel == self.channel and message in self.delayed_messages:
726                         self.delayed_messages.pop(message).cancel()
727                 def _new_share(self, share):
728                     if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
729                         self.announced_hashes.add(share.header_hash)
730                         message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
731                         self.delayed_messages[message] = reactor.callLater(random.expovariate(1/5), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
732                 def connectionLost(self, reason):
733                     tracker.verified.added.unwatch(self.watch_id)
734                     print 'IRC connection lost:', reason.getErrorMessage()
735             class IRCClientFactory(protocol.ReconnectingClientFactory):
736                 protocol = IRCClient
737             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
738         
739         @defer.inlineCallbacks
740         def status_thread():
741             last_str = None
742             last_time = 0
743             while True:
744                 yield deferral.sleep(3)
745                 try:
746                     if time.time() > current_work2.value['last_update'] + 60:
747                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
748                     
749                     height = tracker.get_height(current_work.value['best_share_hash'])
750                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
751                         height,
752                         len(tracker.verified.shares),
753                         len(tracker.shares),
754                         len(p2p_node.peers),
755                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
756                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
757                     
758                     datums, dt = local_rate_monitor.get_datums_in_last()
759                     my_att_s = sum(datum['work']/dt for datum in datums)
760                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
761                         math.format(int(my_att_s)),
762                         math.format_dt(dt),
763                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
764                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
765                     )
766                     
767                     if height > 2:
768                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
769                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
770                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
771                         
772                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
773                             shares, stale_orphan_shares, stale_doa_shares,
774                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
775                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
776                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
777                         )
778                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
779                             math.format(int(real_att_s)),
780                             100*stale_prop,
781                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
782                         )
783                     
784                     if this_str != last_str or time.time() > last_time + 15:
785                         print this_str
786                         last_str = this_str
787                         last_time = time.time()
788                 except:
789                     log.err()
790         status_thread()
791     except:
792         log.err(None, 'Fatal error:')
793         reactor.stop()
794
795 def run():
796     class FixedArgumentParser(argparse.ArgumentParser):
797         def _read_args_from_files(self, arg_strings):
798             # expand arguments referencing files
799             new_arg_strings = []
800             for arg_string in arg_strings:
801                 
802                 # for regular arguments, just add them back into the list
803                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
804                     new_arg_strings.append(arg_string)
805                 
806                 # replace arguments referencing files with the file content
807                 else:
808                     try:
809                         args_file = open(arg_string[1:])
810                         try:
811                             arg_strings = []
812                             for arg_line in args_file.read().splitlines():
813                                 for arg in self.convert_arg_line_to_args(arg_line):
814                                     arg_strings.append(arg)
815                             arg_strings = self._read_args_from_files(arg_strings)
816                             new_arg_strings.extend(arg_strings)
817                         finally:
818                             args_file.close()
819                     except IOError:
820                         err = sys.exc_info()[1]
821                         self.error(str(err))
822             
823             # return the modified argument list
824             return new_arg_strings
825         
826         def convert_arg_line_to_args(self, arg_line):
827             return [arg for arg in arg_line.split() if arg.strip()]
828     
829     
830     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
831     
832     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
833     parser.add_argument('--version', action='version', version=p2pool.__version__)
834     parser.add_argument('--net',
835         help='use specified network (default: bitcoin)',
836         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
837     parser.add_argument('--testnet',
838         help='''use the network's testnet''',
839         action='store_const', const=True, default=False, dest='testnet')
840     parser.add_argument('--debug',
841         help='enable debugging mode',
842         action='store_const', const=True, default=False, dest='debug')
843     parser.add_argument('-a', '--address',
844         help='generate payouts to this address (default: <address requested from bitcoind>)',
845         type=str, action='store', default=None, dest='address')
846     parser.add_argument('--datadir',
847         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
848         type=str, action='store', default=None, dest='datadir')
849     parser.add_argument('--logfile',
850         help='''log to this file (default: data/<NET>/log)''',
851         type=str, action='store', default=None, dest='logfile')
852     parser.add_argument('--merged',
853         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
854         type=str, action='append', default=[], dest='merged_urls')
855     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
856         help='donate this percentage of work to author of p2pool (default: 0.5)',
857         type=float, action='store', default=0.5, dest='donation_percentage')
858     parser.add_argument('--irc-announce',
859         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
860         action='store_true', default=False, dest='irc_announce')
861     
862     p2pool_group = parser.add_argument_group('p2pool interface')
863     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
864         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
865         type=int, action='store', default=None, dest='p2pool_port')
866     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
867         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
868         type=str, action='append', default=[], dest='p2pool_nodes')
869     parser.add_argument('--disable-upnp',
870         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
871         action='store_false', default=True, dest='upnp')
872     
873     worker_group = parser.add_argument_group('worker interface')
874     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
875         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
876         type=str, action='store', default=None, dest='worker_endpoint')
877     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
878         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
879         type=float, action='store', default=0, dest='worker_fee')
880     
881     bitcoind_group = parser.add_argument_group('bitcoind interface')
882     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
883         help='connect to this address (default: 127.0.0.1)',
884         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
885     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
886         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
887         type=int, action='store', default=None, dest='bitcoind_rpc_port')
888     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
889         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
890         type=int, action='store', default=None, dest='bitcoind_p2p_port')
891     
892     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
893         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
894         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
895     
896     args = parser.parse_args()
897     
898     if args.debug:
899         p2pool.DEBUG = True
900     
901     net_name = args.net_name + ('_testnet' if args.testnet else '')
902     net = networks.nets[net_name]
903     
904     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
905     if not os.path.exists(datadir_path):
906         os.makedirs(datadir_path)
907     
908     if len(args.bitcoind_rpc_userpass) > 2:
909         parser.error('a maximum of two arguments are allowed')
910     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
911     
912     if args.bitcoind_rpc_password is None:
913         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
914             parser.error('This network has no configuration file function. Manually enter your RPC password.')
915         conf_path = net.PARENT.CONF_FILE_FUNC()
916         if not os.path.exists(conf_path):
917             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
918                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
919                 '''\r\n'''
920                 '''server=1\r\n'''
921                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
922         with open(conf_path, 'rb') as f:
923             cp = ConfigParser.RawConfigParser()
924             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
925             for conf_name, var_name, var_type in [
926                 ('rpcuser', 'bitcoind_rpc_username', str),
927                 ('rpcpassword', 'bitcoind_rpc_password', str),
928                 ('rpcport', 'bitcoind_rpc_port', int),
929                 ('port', 'bitcoind_p2p_port', int),
930             ]:
931                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
932                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
933     
934     if args.bitcoind_rpc_username is None:
935         args.bitcoind_rpc_username = ''
936     
937     if args.bitcoind_rpc_port is None:
938         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
939     
940     if args.bitcoind_p2p_port is None:
941         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
942     
943     if args.p2pool_port is None:
944         args.p2pool_port = net.P2P_PORT
945     
946     if args.worker_endpoint is None:
947         worker_endpoint = '', net.WORKER_PORT
948     elif ':' not in args.worker_endpoint:
949         worker_endpoint = '', int(args.worker_endpoint)
950     else:
951         addr, port = args.worker_endpoint.rsplit(':', 1)
952         worker_endpoint = addr, int(port)
953     
954     if args.address is not None:
955         try:
956             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
957         except Exception, e:
958             parser.error('error parsing address: ' + repr(e))
959     else:
960         args.pubkey_hash = None
961     
962     def separate_url(url):
963         s = urlparse.urlsplit(url)
964         if '@' not in s.netloc:
965             parser.error('merged url netloc must contain an "@"')
966         userpass, new_netloc = s.netloc.rsplit('@', 1)
967         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
968     merged_urls = map(separate_url, args.merged_urls)
969     
970     if args.logfile is None:
971         args.logfile = os.path.join(datadir_path, 'log')
972     
973     logfile = logging.LogFile(args.logfile)
974     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
975     sys.stdout = logging.AbortPipe(pipe)
976     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
977     if hasattr(signal, "SIGUSR1"):
978         def sigusr1(signum, frame):
979             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
980             logfile.reopen()
981             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
982         signal.signal(signal.SIGUSR1, sigusr1)
983     task.LoopingCall(logfile.reopen).start(5)
984     
985     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
986     reactor.run()