fixed imports and indentation
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import os
7 import random
8 import sys
9 import time
10 import signal
11 import traceback
12 import urlparse
13
14 from twisted.internet import defer, reactor, protocol, task
15 from twisted.web import server
16 from twisted.python import log
17 from nattraverso import portmapper, ipdiscover
18
19 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
20 from bitcoin import worker_interface
21 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
22 from . import p2p, networks, web
23 import p2pool, p2pool.data as p2pool_data
24
25 @deferral.retry('Error getting work from bitcoind:', 3)
26 @defer.inlineCallbacks
27 def getwork(bitcoind):
28     try:
29         work = yield bitcoind.rpc_getmemorypool()
30     except jsonrpc.Error, e:
31         if e.code == -32601: # Method not found
32             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
33             raise deferral.RetrySilentlyException()
34         raise
35     packed_transactions = [x.decode('hex') for x in work['transactions']]
36     defer.returnValue(dict(
37         version=work['version'],
38         previous_block_hash=int(work['previousblockhash'], 16),
39         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
40         merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
41         subsidy=work['coinbasevalue'],
42         time=work['time'],
43         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
44         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
45     ))
46
47 @defer.inlineCallbacks
48 def main(args, net, datadir_path, merged_urls, worker_endpoint):
49     try:
50         print 'p2pool (version %s)' % (p2pool.__version__,)
51         print
52         
53         # connect to bitcoind over JSON-RPC and do initial getmemorypool
54         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
55         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
56         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
57         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
58         if not good:
59             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
60             return
61         temp_work = yield getwork(bitcoind)
62         print '    ...success!'
63         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
64         print
65         
66         # connect to bitcoind over bitcoin-p2p
67         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
68         factory = bitcoin_p2p.ClientFactory(net.PARENT)
69         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
70         yield factory.getProtocol() # waits until handshake is successful
71         print '    ...success!'
72         print
73         
74         print 'Determining payout address...'
75         if args.pubkey_hash is None:
76             address_path = os.path.join(datadir_path, 'cached_payout_address')
77             
78             if os.path.exists(address_path):
79                 with open(address_path, 'rb') as f:
80                     address = f.read().strip('\r\n')
81                 print '    Loaded cached address: %s...' % (address,)
82             else:
83                 address = None
84             
85             if address is not None:
86                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
87                 if not res['isvalid'] or not res['ismine']:
88                     print '    Cached address is either invalid or not controlled by local bitcoind!'
89                     address = None
90             
91             if address is None:
92                 print '    Getting payout address from bitcoind...'
93                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
94             
95             with open(address_path, 'wb') as f:
96                 f.write(address)
97             
98             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
99         else:
100             my_pubkey_hash = args.pubkey_hash
101         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
102         print
103         
104         my_share_hashes = set()
105         my_doa_share_hashes = set()
106         
107         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
108         shared_share_hashes = set()
109         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
110         known_verified = set()
111         recent_blocks = []
112         print "Loading shares..."
113         for i, (mode, contents) in enumerate(ss.get_shares()):
114             if mode == 'share':
115                 if contents.hash in tracker.shares:
116                     continue
117                 shared_share_hashes.add(contents.hash)
118                 contents.time_seen = 0
119                 tracker.add(contents)
120                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
121                     print "    %i" % (len(tracker.shares),)
122             elif mode == 'verified_hash':
123                 known_verified.add(contents)
124             else:
125                 raise AssertionError()
126         print "    ...inserting %i verified shares..." % (len(known_verified),)
127         for h in known_verified:
128             if h not in tracker.shares:
129                 ss.forget_verified_share(h)
130                 continue
131             tracker.verified.add(tracker.shares[h])
132         print "    ...done loading %i shares!" % (len(tracker.shares),)
133         print
134         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
135         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
136         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
137         
138         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
139         
140         pre_current_work = variable.Variable(None)
141         pre_merged_work = variable.Variable({})
142         # information affecting work that should trigger a long-polling update
143         current_work = variable.Variable(None)
144         # information affecting work that should not trigger a long-polling update
145         current_work2 = variable.Variable(None)
146         
147         requested = expiring_dict.ExpiringDict(300)
148         
149         print 'Initializing work...'
150         @defer.inlineCallbacks
151         def set_real_work1():
152             work = yield getwork(bitcoind)
153             current_work2.set(dict(
154                 time=work['time'],
155                 transactions=work['transactions'],
156                 merkle_branch=work['merkle_branch'],
157                 subsidy=work['subsidy'],
158                 clock_offset=time.time() - work['time'],
159                 last_update=time.time(),
160             )) # second set first because everything hooks on the first
161             pre_current_work.set(dict(
162                 version=work['version'],
163                 previous_block=work['previous_block_hash'],
164                 bits=work['bits'],
165                 coinbaseflags=work['coinbaseflags'],
166             ))
167         yield set_real_work1()
168         
169         if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
170             @deferral.DeferredCacher
171             @defer.inlineCallbacks
172             def height_cacher(block_hash):
173                 try:
174                     x = yield bitcoind.rpc_getblock('%x' % (block_hash,))
175                 except jsonrpc.Error, e:
176                     if e.code == -5 and not p2pool.DEBUG:
177                         raise deferral.RetrySilentlyException()
178                     raise
179                 defer.returnValue(x['blockcount'] if 'blockcount' in x else x['height'])
180             best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
181             def get_height_rel_highest(block_hash):
182                 this_height = height_cacher.call_now(block_hash, 0)
183                 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
184                 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
185                 return this_height - best_height_cached.value
186         else:
187             get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
188         
189         def set_real_work2():
190             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
191             
192             t = dict(pre_current_work.value)
193             t['best_share_hash'] = best
194             t['mm_chains'] = pre_merged_work.value
195             current_work.set(t)
196             
197             t = time.time()
198             for peer2, share_hash in desired:
199                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
200                     continue
201                 last_request_time, count = requested.get(share_hash, (None, 0))
202                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
203                     continue
204                 potential_peers = set()
205                 for head in tracker.tails[share_hash]:
206                     potential_peers.update(peer_heads.get(head, set()))
207                 potential_peers = [peer for peer in potential_peers if peer.connected2]
208                 if count == 0 and peer2 is not None and peer2.connected2:
209                     peer = peer2
210                 else:
211                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
212                     if peer is None:
213                         continue
214                 
215                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
216                 peer.send_getshares(
217                     hashes=[share_hash],
218                     parents=2000,
219                     stops=list(set(tracker.heads) | set(
220                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
221                     ))[:100],
222                 )
223                 requested[share_hash] = t, count + 1
224         pre_current_work.changed.watch(lambda _: set_real_work2())
225         pre_merged_work.changed.watch(lambda _: set_real_work2())
226         set_real_work2()
227         print '    ...success!'
228         print
229         
230         
231         @defer.inlineCallbacks
232         def set_merged_work(merged_url, merged_userpass):
233             merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
234             while True:
235                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
236                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
237                     hash=int(auxblock['hash'], 16),
238                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
239                     merged_proxy=merged_proxy,
240                 )}))
241                 yield deferral.sleep(1)
242         for merged_url, merged_userpass in merged_urls:
243             set_merged_work(merged_url, merged_userpass)
244         
245         @pre_merged_work.changed.watch
246         def _(new_merged_work):
247             print 'Got new merged mining work!'
248         
249         # setup p2p logic and join p2pool network
250         
251         class Node(p2p.Node):
252             def handle_shares(self, shares, peer):
253                 if len(shares) > 5:
254                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
255                 
256                 new_count = 0
257                 for share in shares:
258                     if share.hash in tracker.shares:
259                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
260                         continue
261                     
262                     new_count += 1
263                     
264                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
265                     
266                     tracker.add(share)
267                 
268                 if shares and peer is not None:
269                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
270                 
271                 if new_count:
272                     set_real_work2()
273                 
274                 if len(shares) > 5:
275                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
276             
277             def handle_share_hashes(self, hashes, peer):
278                 t = time.time()
279                 get_hashes = []
280                 for share_hash in hashes:
281                     if share_hash in tracker.shares:
282                         continue
283                     last_request_time, count = requested.get(share_hash, (None, 0))
284                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
285                         continue
286                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
287                     get_hashes.append(share_hash)
288                     requested[share_hash] = t, count + 1
289                 
290                 if hashes and peer is not None:
291                     peer_heads.setdefault(hashes[0], set()).add(peer)
292                 if get_hashes:
293                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
294             
295             def handle_get_shares(self, hashes, parents, stops, peer):
296                 parents = min(parents, 1000//len(hashes))
297                 stops = set(stops)
298                 shares = []
299                 for share_hash in hashes:
300                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
301                         if share.hash in stops:
302                             break
303                         shares.append(share)
304                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
305                 peer.sendShares(shares)
306         
307         @deferral.retry('Error submitting block: (will retry)', 10, 10)
308         @defer.inlineCallbacks
309         def submit_block(block, ignore_failure):
310             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
311             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
312             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
313                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
314         
315         @tracker.verified.added.watch
316         def _(share):
317             if share.pow_hash <= share.header['bits'].target:
318                 submit_block(share.as_block(tracker), ignore_failure=True)
319                 print
320                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
321                 print
322                 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
323         
324         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
325         
326         @defer.inlineCallbacks
327         def parse(x):
328             if ':' in x:
329                 ip, port = x.split(':')
330                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
331             else:
332                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
333         
334         addrs = {}
335         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
336             try:
337                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
338             except:
339                 print >>sys.stderr, "error reading addrs"
340         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
341             try:
342                 addr = yield addr_df
343                 if addr not in addrs:
344                     addrs[addr] = (0, time.time(), time.time())
345             except:
346                 log.err()
347         
348         connect_addrs = set()
349         for addr_df in map(parse, args.p2pool_nodes):
350             try:
351                 connect_addrs.add((yield addr_df))
352             except:
353                 log.err()
354         
355         p2p_node = Node(
356             best_share_hash_func=lambda: current_work.value['best_share_hash'],
357             port=args.p2pool_port,
358             net=net,
359             addr_store=addrs,
360             connect_addrs=connect_addrs,
361         )
362         p2p_node.start()
363         
364         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
365         
366         # send share when the chain changes to their chain
367         def work_changed(new_work):
368             #print 'Work changed:', new_work
369             shares = []
370             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
371                 if share.hash in shared_share_hashes:
372                     break
373                 shared_share_hashes.add(share.hash)
374                 shares.append(share)
375             
376             for peer in p2p_node.peers.itervalues():
377                 peer.sendShares([share for share in shares if share.peer is not peer])
378         
379         current_work.changed.watch(work_changed)
380         
381         def save_shares():
382             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
383                 ss.add_share(share)
384                 if share.hash in tracker.verified.shares:
385                     ss.add_verified_hash(share.hash)
386         task.LoopingCall(save_shares).start(60)
387         
388         print '    ...success!'
389         print
390         
391         if args.upnp:
392             @defer.inlineCallbacks
393             def upnp_thread():
394                 while True:
395                     try:
396                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
397                         if is_lan:
398                             pm = yield portmapper.get_port_mapper()
399                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
400                     except defer.TimeoutError:
401                         pass
402                     except:
403                         if p2pool.DEBUG:
404                             log.err(None, 'UPnP error:')
405                     yield deferral.sleep(random.expovariate(1/120))
406             upnp_thread()
407         
408         # start listening for workers with a JSON-RPC server
409         
410         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
411         
412         if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
413             with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
414                 vip_pass = f.read().strip('\r\n')
415         else:
416             vip_pass = '%016x' % (random.randrange(2**64),)
417             with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
418                 f.write(vip_pass)
419         print '    Worker password:', vip_pass, '(only required for generating graphs)'
420         
421         # setup worker logic
422         
423         removed_unstales_var = variable.Variable((0, 0, 0))
424         removed_doa_unstales_var = variable.Variable(0)
425         @tracker.verified.removed.watch
426         def _(share):
427             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
428                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
429                 removed_unstales_var.set((
430                     removed_unstales_var.value[0] + 1,
431                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
432                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
433                 ))
434             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
435                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
436         
437         def get_stale_counts():
438             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
439             my_shares = len(my_share_hashes)
440             my_doa_shares = len(my_doa_share_hashes)
441             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
442             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
443             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
444             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
445             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
446             
447             my_shares_not_in_chain = my_shares - my_shares_in_chain
448             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
449             
450             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
451         
452         
453         pseudoshare_received = variable.Event()
454         local_rate_monitor = math.RateMonitor(10*60)
455         
456         class WorkerBridge(worker_interface.WorkerBridge):
457             def __init__(self):
458                 worker_interface.WorkerBridge.__init__(self)
459                 self.new_work_event = current_work.changed
460                 self.recent_shares_ts_work = []
461             
462             def preprocess_request(self, request):
463                 user = request.getUser() if request.getUser() is not None else ''
464                 
465                 desired_pseudoshare_target = None
466                 if '+' in user:
467                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
468                     try:
469                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
470                     except:
471                         pass
472                 
473                 desired_share_target = 2**256 - 1
474                 if '/' in user:
475                     user, min_diff_str = user.rsplit('/', 1)
476                     try:
477                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
478                     except:
479                         pass
480                 
481                 if random.uniform(0, 100) < args.worker_fee:
482                     pubkey_hash = my_pubkey_hash
483                 else:
484                     try:
485                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
486                     except: # XXX blah
487                         pubkey_hash = my_pubkey_hash
488                 
489                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
490             
491             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
492                 if len(p2p_node.peers) == 0 and net.PERSIST:
493                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
494                 if current_work.value['best_share_hash'] is None and net.PERSIST:
495                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
496                 if time.time() > current_work2.value['last_update'] + 60:
497                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
498                 
499                 if current_work.value['mm_chains']:
500                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
501                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
502                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
503                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
504                         size=size,
505                         nonce=0,
506                     ))
507                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
508                 else:
509                     mm_data = ''
510                     mm_later = []
511                 
512                 share_info, generate_tx = p2pool_data.generate_transaction(
513                     tracker=tracker,
514                     share_data=dict(
515                         previous_share_hash=current_work.value['best_share_hash'],
516                         coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
517                         nonce=random.randrange(2**32),
518                         pubkey_hash=pubkey_hash,
519                         subsidy=current_work2.value['subsidy'],
520                         donation=math.perfect_round(65535*args.donation_percentage/100),
521                         stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
522                             253 if orphans > orphans_recorded_in_chain else
523                             254 if doas > doas_recorded_in_chain else
524                             0
525                         )(*get_stale_counts()),
526                     ),
527                     block_target=current_work.value['bits'].target,
528                     desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
529                     desired_target=desired_share_target,
530                     net=net,
531                 )
532                 
533                 target = net.PARENT.SANE_MAX_TARGET
534                 if desired_pseudoshare_target is None:
535                     if len(self.recent_shares_ts_work) == 50:
536                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
537                         target = min(target, 2**256//hash_rate)
538                 else:
539                     target = min(target, desired_pseudoshare_target)
540                 target = max(target, share_info['bits'].target)
541                 for aux_work in current_work.value['mm_chains'].itervalues():
542                     target = max(target, aux_work['target'])
543                 
544                 transactions = [generate_tx] + list(current_work2.value['transactions'])
545                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
546                 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
547                 
548                 getwork_time = time.time()
549                 merkle_branch = current_work2.value['merkle_branch']
550                 
551                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
552                     bitcoin_data.target_to_difficulty(target),
553                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
554                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
555                     len(current_work2.value['transactions']),
556                 )
557                 
558                 ba = bitcoin_getwork.BlockAttempt(
559                     version=current_work.value['version'],
560                     previous_block=current_work.value['previous_block'],
561                     merkle_root=merkle_root,
562                     timestamp=current_work2.value['time'],
563                     bits=current_work.value['bits'],
564                     share_target=target,
565                 )
566                 
567                 received_header_hashes = set()
568                 
569                 def got_response(header, request):
570                     assert header['merkle_root'] == merkle_root
571                     
572                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
573                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
574                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
575                     
576                     try:
577                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
578                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
579                             if pow_hash <= header['bits'].target:
580                                 print
581                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
582                                 print
583                                 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
584                     except:
585                         log.err(None, 'Error while processing potential block:')
586                     
587                     for aux_work, index, hashes in mm_later:
588                         try:
589                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
590                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
591                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
592                                     bitcoin_data.aux_pow_type.pack(dict(
593                                         merkle_tx=dict(
594                                             tx=transactions[0],
595                                             block_hash=header_hash,
596                                             merkle_branch=merkle_branch,
597                                             index=0,
598                                         ),
599                                         merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
600                                         index=index,
601                                         parent_block_header=header,
602                                     )).encode('hex'),
603                                 )
604                                 @df.addCallback
605                                 def _(result):
606                                     if result != (pow_hash <= aux_work['target']):
607                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
608                                     else:
609                                         print 'Merged block submittal result: %s' % (result,)
610                                 @df.addErrback
611                                 def _(err):
612                                     log.err(err, 'Error submitting merged block:')
613                         except:
614                             log.err(None, 'Error while processing merged mining POW:')
615                     
616                     if pow_hash <= share_info['bits'].target:
617                         min_header = dict(header);del min_header['merkle_root']
618                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
619                         share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
620                         
621                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
622                             request.getUser(),
623                             p2pool_data.format_hash(share.hash),
624                             p2pool_data.format_hash(share.previous_hash),
625                             time.time() - getwork_time,
626                             ' DEAD ON ARRIVAL' if not on_time else '',
627                         )
628                         my_share_hashes.add(share.hash)
629                         if not on_time:
630                             my_doa_share_hashes.add(share.hash)
631                         
632                         tracker.add(share)
633                         if not p2pool.DEBUG:
634                             tracker.verified.add(share)
635                         set_real_work2()
636                         
637                         try:
638                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
639                                 for peer in p2p_node.peers.itervalues():
640                                     peer.sendShares([share])
641                                 shared_share_hashes.add(share.hash)
642                         except:
643                             log.err(None, 'Error forwarding block solution:')
644                     
645                     if pow_hash > target:
646                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
647                         print '    Hash:   %56x' % (pow_hash,)
648                         print '    Target: %56x' % (target,)
649                     elif header_hash in received_header_hashes:
650                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
651                     else:
652                         received_header_hashes.add(header_hash)
653                         
654                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
655                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
656                         while len(self.recent_shares_ts_work) > 50:
657                             self.recent_shares_ts_work.pop(0)
658                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
659                     
660                     return on_time
661                 
662                 return ba, got_response
663         
664         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
665         
666         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
667         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
668         
669         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
670         
671         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
672             pass
673         
674         print '    ...success!'
675         print
676         
677         
678         @defer.inlineCallbacks
679         def work_poller():
680             while True:
681                 flag = factory.new_block.get_deferred()
682                 try:
683                     yield set_real_work1()
684                 except:
685                     log.err()
686                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
687         work_poller()
688         
689         
690         # done!
691         print 'Started successfully!'
692         print
693         
694         
695         if hasattr(signal, 'SIGALRM'):
696             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
697                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
698             ))
699             signal.siginterrupt(signal.SIGALRM, False)
700             task.LoopingCall(signal.alarm, 30).start(1)
701         
702         if args.irc_announce:
703             from twisted.words.protocols import irc
704             class IRCClient(irc.IRCClient):
705                 nickname = 'p2pool%02i' % (random.randrange(100),)
706                 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
707                 def lineReceived(self, line):
708                     print repr(line)
709                     irc.IRCClient.lineReceived(self, line)
710                 def signedOn(self):
711                     irc.IRCClient.signedOn(self)
712                     self.factory.resetDelay()
713                     self.join(self.channel)
714                     self.watch_id = tracker.verified.added.watch(self._new_share)
715                     self.announced_hashes = set()
716                     self.delayed_messages = {}
717                 def privmsg(self, user, channel, message):
718                     if channel == self.channel and message in self.delayed_messages:
719                         self.delayed_messages.pop(message).cancel()
720                 def _new_share(self, share):
721                     if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
722                         self.announced_hashes.add(share.header_hash)
723                         message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
724                         self.delayed_messages[message] = reactor.callLater(random.expovariate(1/60), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
725                 def connectionLost(self, reason):
726                     tracker.verified.added.unwatch(self.watch_id)
727                     print 'IRC connection lost:', reason.getErrorMessage()
728             class IRCClientFactory(protocol.ReconnectingClientFactory):
729                 protocol = IRCClient
730             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
731         
732         @defer.inlineCallbacks
733         def status_thread():
734             last_str = None
735             last_time = 0
736             while True:
737                 yield deferral.sleep(3)
738                 try:
739                     if time.time() > current_work2.value['last_update'] + 60:
740                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
741                     
742                     height = tracker.get_height(current_work.value['best_share_hash'])
743                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
744                         height,
745                         len(tracker.verified.shares),
746                         len(tracker.shares),
747                         len(p2p_node.peers),
748                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
749                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
750                     
751                     datums, dt = local_rate_monitor.get_datums_in_last()
752                     my_att_s = sum(datum['work']/dt for datum in datums)
753                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
754                         math.format(int(my_att_s)),
755                         math.format_dt(dt),
756                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
757                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
758                     )
759                     
760                     if height > 2:
761                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
762                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
763                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
764                         
765                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
766                             shares, stale_orphan_shares, stale_doa_shares,
767                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
768                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
769                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
770                         )
771                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
772                             math.format(int(real_att_s)),
773                             100*stale_prop,
774                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
775                         )
776                     
777                     if this_str != last_str or time.time() > last_time + 15:
778                         print this_str
779                         last_str = this_str
780                         last_time = time.time()
781                 except:
782                     log.err()
783         status_thread()
784     except:
785         log.err(None, 'Fatal error:')
786         reactor.stop()
787
788 def run():
789     class FixedArgumentParser(argparse.ArgumentParser):
790         def _read_args_from_files(self, arg_strings):
791             # expand arguments referencing files
792             new_arg_strings = []
793             for arg_string in arg_strings:
794                 
795                 # for regular arguments, just add them back into the list
796                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
797                     new_arg_strings.append(arg_string)
798                 
799                 # replace arguments referencing files with the file content
800                 else:
801                     try:
802                         args_file = open(arg_string[1:])
803                         try:
804                             arg_strings = []
805                             for arg_line in args_file.read().splitlines():
806                                 for arg in self.convert_arg_line_to_args(arg_line):
807                                     arg_strings.append(arg)
808                             arg_strings = self._read_args_from_files(arg_strings)
809                             new_arg_strings.extend(arg_strings)
810                         finally:
811                             args_file.close()
812                     except IOError:
813                         err = sys.exc_info()[1]
814                         self.error(str(err))
815             
816             # return the modified argument list
817             return new_arg_strings
818         
819         def convert_arg_line_to_args(self, arg_line):
820             return [arg for arg in arg_line.split() if arg.strip()]
821     
822     
823     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
824     
825     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
826     parser.add_argument('--version', action='version', version=p2pool.__version__)
827     parser.add_argument('--net',
828         help='use specified network (default: bitcoin)',
829         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
830     parser.add_argument('--testnet',
831         help='''use the network's testnet''',
832         action='store_const', const=True, default=False, dest='testnet')
833     parser.add_argument('--debug',
834         help='enable debugging mode',
835         action='store_const', const=True, default=False, dest='debug')
836     parser.add_argument('-a', '--address',
837         help='generate payouts to this address (default: <address requested from bitcoind>)',
838         type=str, action='store', default=None, dest='address')
839     parser.add_argument('--datadir',
840         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
841         type=str, action='store', default=None, dest='datadir')
842     parser.add_argument('--logfile',
843         help='''log to this file (default: data/<NET>/log)''',
844         type=str, action='store', default=None, dest='logfile')
845     parser.add_argument('--merged',
846         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
847         type=str, action='append', default=[], dest='merged_urls')
848     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
849         help='donate this percentage of work to author of p2pool (default: 0.5)',
850         type=float, action='store', default=0.5, dest='donation_percentage')
851     parser.add_argument('--irc-announce',
852         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
853         action='store_true', default=False, dest='irc_announce')
854     
855     p2pool_group = parser.add_argument_group('p2pool interface')
856     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
857         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
858         type=int, action='store', default=None, dest='p2pool_port')
859     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
860         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
861         type=str, action='append', default=[], dest='p2pool_nodes')
862     parser.add_argument('--disable-upnp',
863         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
864         action='store_false', default=True, dest='upnp')
865     
866     worker_group = parser.add_argument_group('worker interface')
867     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
868         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
869         type=str, action='store', default=None, dest='worker_endpoint')
870     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
871         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
872         type=float, action='store', default=0, dest='worker_fee')
873     
874     bitcoind_group = parser.add_argument_group('bitcoind interface')
875     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
876         help='connect to this address (default: 127.0.0.1)',
877         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
878     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
879         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
880         type=int, action='store', default=None, dest='bitcoind_rpc_port')
881     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
882         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
883         type=int, action='store', default=None, dest='bitcoind_p2p_port')
884     
885     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
886         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
887         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
888     
889     args = parser.parse_args()
890     
891     if args.debug:
892         p2pool.DEBUG = True
893     
894     net_name = args.net_name + ('_testnet' if args.testnet else '')
895     net = networks.nets[net_name]
896     
897     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
898     if not os.path.exists(datadir_path):
899         os.makedirs(datadir_path)
900     
901     if len(args.bitcoind_rpc_userpass) > 2:
902         parser.error('a maximum of two arguments are allowed')
903     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
904     
905     if args.bitcoind_rpc_password is None:
906         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
907             parser.error('This network has no configuration file function. Manually enter your RPC password.')
908         conf_path = net.PARENT.CONF_FILE_FUNC()
909         if not os.path.exists(conf_path):
910             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
911                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
912                 '''\r\n'''
913                 '''server=1\r\n'''
914                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
915         with open(conf_path, 'rb') as f:
916             cp = ConfigParser.RawConfigParser()
917             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
918             for conf_name, var_name, var_type in [
919                 ('rpcuser', 'bitcoind_rpc_username', str),
920                 ('rpcpassword', 'bitcoind_rpc_password', str),
921                 ('rpcport', 'bitcoind_rpc_port', int),
922                 ('port', 'bitcoind_p2p_port', int),
923             ]:
924                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
925                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
926         if args.bitcoind_rpc_password is None:
927             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
928     
929     if args.bitcoind_rpc_username is None:
930         args.bitcoind_rpc_username = ''
931     
932     if args.bitcoind_rpc_port is None:
933         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
934     
935     if args.bitcoind_p2p_port is None:
936         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
937     
938     if args.p2pool_port is None:
939         args.p2pool_port = net.P2P_PORT
940     
941     if args.worker_endpoint is None:
942         worker_endpoint = '', net.WORKER_PORT
943     elif ':' not in args.worker_endpoint:
944         worker_endpoint = '', int(args.worker_endpoint)
945     else:
946         addr, port = args.worker_endpoint.rsplit(':', 1)
947         worker_endpoint = addr, int(port)
948     
949     if args.address is not None:
950         try:
951             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
952         except Exception, e:
953             parser.error('error parsing address: ' + repr(e))
954     else:
955         args.pubkey_hash = None
956     
957     def separate_url(url):
958         s = urlparse.urlsplit(url)
959         if '@' not in s.netloc:
960             parser.error('merged url netloc must contain an "@"')
961         userpass, new_netloc = s.netloc.rsplit('@', 1)
962         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
963     merged_urls = map(separate_url, args.merged_urls)
964     
965     if args.logfile is None:
966         args.logfile = os.path.join(datadir_path, 'log')
967     
968     logfile = logging.LogFile(args.logfile)
969     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
970     sys.stdout = logging.AbortPipe(pipe)
971     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
972     if hasattr(signal, "SIGUSR1"):
973         def sigusr1(signum, frame):
974             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
975             logfile.reopen()
976             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
977         signal.signal(signal.SIGUSR1, sigusr1)
978     task.LoopingCall(logfile.reopen).start(5)
979     
980     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
981     reactor.run()