deasynchronize tracker.think
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15
16 from twisted.internet import defer, reactor
17 from twisted.web import server, resource
18 from twisted.python import log
19
20 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
21 from util import db, expiring_dict, jsonrpc, variable, deferral, math
22 from . import p2p, worker_interface, skiplists
23 import p2pool.data as p2pool
24 import p2pool as p2pool_init
25
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29     # a block could arrive in between these two queries
30     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
31     try:
32         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
33     finally:
34         # get rid of residual errors
35         getwork_df.addErrback(lambda fail: None)
36         height_df.addErrback(lambda fail: None)
37     defer.returnValue((getwork, height))
38
39 @deferral.retry('Error getting payout script from bitcoind:', 1)
40 @defer.inlineCallbacks
41 def get_payout_script(factory):
42     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
43     if res['reply'] == 'success':
44         defer.returnValue(res['script'])
45     elif res['reply'] == 'denied':
46         defer.returnValue(None)
47     else:
48         raise ValueError('Unexpected reply: %r' % (res,))
49
50 @deferral.retry('Error creating payout script:', 10)
51 @defer.inlineCallbacks
52 def get_payout_script2(bitcoind, net):
53     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
54
55 @defer.inlineCallbacks
56 def main(args):
57     try:
58         if args.charts:
59             from . import draw
60         
61         print 'p2pool (version %s)' % (p2pool_init.__version__,)
62         print
63         
64         # connect to bitcoind over JSON-RPC and do initial getwork
65         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
66         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
67         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
68         temp_work, temp_height = yield getwork(bitcoind)
69         print '    ...success!'
70         print '    Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
71         print
72         
73         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
74         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
75         factory = bitcoin.p2p.ClientFactory(args.net)
76         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
77         my_script = yield get_payout_script(factory)
78         if args.pubkey_hash is None:
79             if my_script is None:
80                 print 'IP transaction denied ... falling back to sending to address.'
81                 my_script = yield get_payout_script2(bitcoind, args.net)
82         else:
83             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
84         print '    ...success!'
85         print '    Payout script:', my_script.encode('hex')
86         print
87         
88         ht = bitcoin.p2p.HeightTracker(factory)
89         
90         tracker = p2pool.OkayTracker(args.net)
91         chains = expiring_dict.ExpiringDict(300)
92         def get_chain(chain_id_data):
93             return chains.setdefault(chain_id_data, Chain(chain_id_data))
94         
95         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
96         
97         # information affecting work that should trigger a long-polling update
98         current_work = variable.Variable(None)
99         # information affecting work that should not trigger a long-polling update
100         current_work2 = variable.Variable(None)
101         
102         work_updated = variable.Event()
103         tracker_updated = variable.Event()
104         
105         requested = expiring_dict.ExpiringDict(300)
106         
107         @defer.inlineCallbacks
108         def set_real_work1():
109             work, height = yield getwork(bitcoind)
110             # XXX call tracker_updated
111             current_work.set(dict(
112                 version=work.version,
113                 previous_block=work.previous_block,
114                 target=work.target,
115                 height=height,
116                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
117             ))
118             current_work2.set(dict(
119                 clock_offset=time.time() - work.timestamp,
120             ))
121         
122         def set_real_work2():
123             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
124             
125             t = dict(current_work.value)
126             t['best_share_hash'] = best
127             current_work.set(t)
128             
129             t = time.time()
130             for peer2, share_hash in desired:
131                 #if share_hash not in tracker.tails: # was received in the time tracker.think was running
132                 #    continue
133                 last_request_time, count = requested.get(share_hash, (None, 0))
134                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
135                     continue
136                 potential_peers = set()
137                 for head in tracker.tails[share_hash]:
138                     potential_peers.update(peer_heads.get(head, set()))
139                 potential_peers = [peer for peer in potential_peers if peer.connected2]
140                 if count == 0 and peer2 is not None and peer2.connected2:
141                     peer = peer2
142                 else:
143                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
144                     if peer is None:
145                         continue
146                 
147                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
148                 peer.send_getshares(
149                     hashes=[share_hash],
150                     parents=2000,
151                     stops=list(set(tracker.heads) | set(
152                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
153                     ))[:100],
154                 )
155                 requested[share_hash] = t, count + 1
156         
157         print 'Initializing work...'
158         yield set_real_work1()
159         set_real_work2()
160         print '    ...success!'
161         
162         start_time = time.time() - current_work2.value['clock_offset']
163         
164         # setup p2p logic and join p2pool network
165         
166         def share_share(share, ignore_peer=None):
167             for peer in p2p_node.peers.itervalues():
168                 if peer is ignore_peer:
169                     continue
170                 if p2pool_init.DEBUG:
171                     print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
172                 peer.send_shares([share])
173             share.flag_shared()
174         
175         def p2p_shares(shares, peer=None):
176             if len(shares) > 5:
177                 print 'Processing %i shares...' % (len(shares),)
178             
179             some_new = False
180             for share in shares:
181                 if share.hash in tracker.shares:
182                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
183                     continue
184                 some_new = True
185                 
186                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
187                 
188                 tracker.add(share)
189                 #for peer2, share_hash in desired:
190                 #    print 'Requesting parent share %x' % (share_hash,)
191                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
192                 
193                 if share.bitcoin_hash <= share.header['target']:
194                     print
195                     print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
196                     print
197                     if factory.conn.value is not None:
198                         factory.conn.value.send_block(block=share.as_block(tracker, args.net))
199                     else:
200                         print 'No bitcoind connection! Erp!'
201             
202             if shares and peer is not None:
203                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
204             
205             if some_new:
206                 tracker_updated.happened()
207             
208             if len(shares) > 5:
209                 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
210         
211         def p2p_share_hashes(share_hashes, peer):
212             t = time.time()
213             get_hashes = []
214             for share_hash in share_hashes:
215                 if share_hash in tracker.shares:
216                     continue
217                 last_request_time, count = requested.get(share_hash, (None, 0))
218                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
219                     continue
220                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
221                 get_hashes.append(share_hash)
222                 requested[share_hash] = t, count + 1
223             
224             if share_hashes and peer is not None:
225                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
226             if get_hashes:
227                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
228         
229         def p2p_get_shares(share_hashes, parents, stops, peer):
230             parents = min(parents, 1000//len(share_hashes))
231             stops = set(stops)
232             shares = []
233             for share_hash in share_hashes:
234                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
235                     if share.hash in stops:
236                         break
237                     shares.append(share)
238             peer.send_shares(shares, full=True)
239         
240         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
241         
242         def parse(x):
243             if ':' in x:
244                 ip, port = x.split(':')
245                 return ip, int(port)
246             else:
247                 return x, args.net.P2P_PORT
248         
249         nodes = [
250             ('72.14.191.28', args.net.P2P_PORT),
251             ('62.204.197.159', args.net.P2P_PORT),
252         ]
253         try:
254             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
255         except:
256             log.err(None, 'Error resolving bootstrap node IP:')
257         
258         p2p_node = p2p.Node(
259             current_work=current_work,
260             port=args.p2pool_port,
261             net=args.net,
262             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
263             mode=0 if args.low_bandwidth else 1,
264             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
265         )
266         p2p_node.handle_shares = p2p_shares
267         p2p_node.handle_share_hashes = p2p_share_hashes
268         p2p_node.handle_get_shares = p2p_get_shares
269         
270         p2p_node.start()
271         
272         # send share when the chain changes to their chain
273         def work_changed(new_work):
274             #print 'Work changed:', new_work
275             for share in tracker.get_chain_known(new_work['best_share_hash']):
276                 if share.shared:
277                     break
278                 share_share(share, share.peer)
279         current_work.changed.watch(work_changed)
280         
281         print '    ...success!'
282         print
283         
284         # start listening for workers with a JSON-RPC server
285         
286         print 'Listening for workers on port %i...' % (args.worker_port,)
287         
288         # setup worker logic
289         
290         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
291         run_identifier = struct.pack('<Q', random.randrange(2**64))
292         
293         def compute(state, payout_script):
294             if payout_script is None:
295                 payout_script = my_script
296             if state['best_share_hash'] is None and args.net.PERSIST:
297                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
298             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
299             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
300             extra_txs = []
301             size = 0
302             for tx in pre_extra_txs:
303                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
304                 if size + this_size > 500000:
305                     break
306                 extra_txs.append(tx)
307                 size += this_size
308             # XXX check sigops!
309             # XXX assuming generate_tx is smallish here..
310             generate_tx = p2pool.generate_transaction(
311                 tracker=tracker,
312                 previous_share_hash=state['best_share_hash'],
313                 new_script=payout_script,
314                 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
315                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
316                 block_target=state['target'],
317                 net=args.net,
318             )
319             print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
320             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
321             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
322             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
323             merkle_root = bitcoin.data.merkle_hash(transactions)
324             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
325             
326             timestamp = int(time.time() - current_work2.value['clock_offset'])
327             if state['best_share_hash'] is not None:
328                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
329                 if timestamp2 > timestamp:
330                     print 'Toff', timestamp2 - timestamp
331                     timestamp = timestamp2
332             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
333             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
334             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
335             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
336         
337         my_shares = set()
338         times = {}
339         
340         def got_response(data):
341             try:
342                 # match up with transactions
343                 header = bitcoin.getwork.decode_data(data)
344                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
345                 if transactions is None:
346                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
347                     return False
348                 block = dict(header=header, txs=transactions)
349                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
350                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
351                     print
352                     print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
353                     print
354                     if factory.conn.value is not None:
355                         factory.conn.value.send_block(block=block)
356                     else:
357                         print 'No bitcoind connection! Erp!'
358                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
359                 if hash_ > target:
360                     print 'Received invalid share from worker - %x/%x' % (hash_, target)
361                     return False
362                 share = p2pool.Share.from_block(block)
363                 my_shares.add(share.hash)
364                 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
365                 good = share.previous_hash == current_work.value['best_share_hash']
366                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
367                 p2p_shares([share])
368                 # eg. good = share.hash == current_work.value['best_share_hash'] here
369                 return good
370             except:
371                 log.err(None, 'Error processing data received from worker:')
372                 return False
373         
374         web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
375         
376         def get_rate():
377             if current_work.value['best_share_hash'] is not None:
378                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
379                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
380                 return json.dumps(att_s)
381             return json.dumps(None)
382         
383         def get_users():
384             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
385             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
386             res = {}
387             for script in sorted(weights, key=lambda s: weights[s]):
388                 res[script.encode('hex')] = weights[script]/total_weight
389             return json.dumps(res)
390         
391         class WebInterface(resource.Resource):
392             def __init__(self, func, mime_type):
393                 self.func, self.mime_type = func, mime_type
394             
395             def render_GET(self, request):
396                 request.setHeader('Content-Type', self.mime_type)
397                 return self.func()
398         
399         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
400         web_root.putChild('users', WebInterface(get_users, 'application/json'))
401         if args.charts:
402             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
403         
404         reactor.listenTCP(args.worker_port, server.Site(web_root))
405         
406         print '    ...success!'
407         print
408         
409         # done!
410         
411         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
412         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
413         
414         class Tx(object):
415             def __init__(self, tx, seen_at_block):
416                 self.hash = bitcoin.data.tx_type.hash256(tx)
417                 self.tx = tx
418                 self.seen_at_block = seen_at_block
419                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
420                 #print
421                 #print '%x %r' % (seen_at_block, tx)
422                 #for mention in self.mentions:
423                 #    print '%x' % mention
424                 #print
425                 self.parents_all_in_blocks = False
426                 self.value_in = 0
427                 #print self.tx
428                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
429                 self._find_parents_in_blocks()
430             
431             @defer.inlineCallbacks
432             def _find_parents_in_blocks(self):
433                 for tx_in in self.tx['tx_ins']:
434                     try:
435                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
436                     except Exception:
437                         return
438                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
439                     #print raw_transaction
440                     if not raw_transaction['parent_blocks']:
441                         return
442                 self.parents_all_in_blocks = True
443             
444             def is_good(self):
445                 if not self.parents_all_in_blocks:
446                     return False
447                 x = self.is_good2()
448                 #print 'is_good:', x
449                 return x
450         
451         @defer.inlineCallbacks
452         def new_tx(tx_hash):
453             try:
454                 assert isinstance(tx_hash, (int, long))
455                 #print 'REQUESTING', tx_hash
456                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
457                 #print 'GOT', tx
458                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
459             except:
460                 log.err(None, 'Error handling tx:')
461         # disable for now, for testing impact on stales
462         #factory.new_tx.watch(new_tx)
463         
464         def new_block(block_hash):
465             work_updated.happened()
466         factory.new_block.watch(new_block)
467         
468         print 'Started successfully!'
469         print
470         
471         @defer.inlineCallbacks
472         def work1_thread():
473             while True:
474                 flag = work_updated.get_deferred()
475                 try:
476                     yield set_real_work1()
477                 except:
478                     log.err()
479                 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
480         
481         @defer.inlineCallbacks
482         def work2_thread():
483             while True:
484                 flag = tracker_updated.get_deferred()
485                 try:
486                     set_real_work2()
487                 except:
488                     log.err()
489                 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
490         
491         work1_thread()
492         work2_thread()
493         
494         counter = skiplists.CountsSkipList(tracker, run_identifier)
495         
496         while True:
497             yield deferral.sleep(random.expovariate(1/1))
498             try:
499                 if current_work.value['best_share_hash'] is not None:
500                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
501                     if height > 5:
502                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
503                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
504                         matching_in_chain = counter(current_work.value['best_share_hash'], height)
505                         shares_in_chain = my_shares & matching_in_chain
506                         stale_shares = my_shares - matching_in_chain
507                         print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
508                             math.format(att_s),
509                             height,
510                             weights.get(my_script, 0)/total_weight*100,
511                             math.format(weights.get(my_script, 0)/total_weight*att_s),
512                             len(shares_in_chain) + len(stale_shares),
513                             len(stale_shares),
514                             len(p2p_node.peers),
515                         )
516                         #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
517                         #for k, v in weights.iteritems():
518                         #    print k.encode('hex'), v/total_weight
519             except:
520                 log.err()
521     except:
522         log.err(None, 'Fatal error:')
523         reactor.stop()
524
525 def run():
526     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
527     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
528     parser.add_argument('--testnet',
529         help='use the testnet',
530         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
531     parser.add_argument('--debug',
532         help='debugging mode',
533         action='store_const', const=True, default=False, dest='debug')
534     parser.add_argument('-a', '--address',
535         help='generate to this address (defaults to requesting one from bitcoind)',
536         type=str, action='store', default=None, dest='address')
537     parser.add_argument('--charts',
538         help='generate charts on the web interface (requires PIL and pygame)',
539         action='store_const', const=True, default=False, dest='charts')
540     
541     p2pool_group = parser.add_argument_group('p2pool interface')
542     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
543         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
544         type=int, action='store', default=None, dest='p2pool_port')
545     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
546         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
547         type=str, action='append', default=[], dest='p2pool_nodes')
548     parser.add_argument('-l', '--low-bandwidth',
549         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
550         action='store_true', default=False, dest='low_bandwidth')
551     
552     worker_group = parser.add_argument_group('worker interface')
553     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
554         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
555         type=int, action='store', default=9332, dest='worker_port')
556     
557     bitcoind_group = parser.add_argument_group('bitcoind interface')
558     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
559         help='connect to a bitcoind at this address (default: 127.0.0.1)',
560         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
561     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
562         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
563         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
564     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
565         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
566         type=int, action='store', default=None, dest='bitcoind_p2p_port')
567     
568     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
569         help='bitcoind RPC interface username',
570         type=str, action='store', dest='bitcoind_rpc_username')
571     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
572         help='bitcoind RPC interface password',
573         type=str, action='store', dest='bitcoind_rpc_password')
574     
575     args = parser.parse_args()
576     
577     if args.debug:
578         p2pool_init.DEBUG = True
579         class TeePipe(object):
580             def __init__(self, outputs):
581                 self.outputs = outputs
582             def write(self, data):
583                 for output in self.outputs:
584                     output.write(data)
585             def flush(self):
586                 for output in self.outputs:
587                     output.flush()
588         class TimestampingPipe(object):
589             def __init__(self, inner_file):
590                 self.inner_file = inner_file
591                 self.buf = ''
592                 self.softspace = 0
593             def write(self, data):
594                 buf = self.buf + data
595                 lines = buf.split('\n')
596                 for line in lines[:-1]:
597                     self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
598                     self.inner_file.flush()
599                 self.buf = lines[-1]
600             def flush(self):
601                 pass
602         sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
603     
604     if args.bitcoind_p2p_port is None:
605         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
606     
607     if args.p2pool_port is None:
608         args.p2pool_port = args.net.P2P_PORT
609     
610     if args.address is not None:
611         try:
612             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
613         except Exception, e:
614             raise ValueError('error parsing address: ' + repr(e))
615     else:
616         args.pubkey_hash = None
617     
618     reactor.callWhenRunning(main, args)
619     reactor.run()