fix race issue with updating work/tracker
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import sys
12 import time
13
14 from twisted.internet import defer, reactor, task
15 from twisted.web import server
16 from twisted.python import log
17
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22 import p2pool as p2pool_init
23
24 @deferral.retry('Error getting work from bitcoind:', 3)
25 @defer.inlineCallbacks
26 def getwork(bitcoind):
27     # a block could arrive in between these two queries
28     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
29     try:
30         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
31     finally:
32         # get rid of residual errors
33         getwork_df.addErrback(lambda fail: None)
34         height_df.addErrback(lambda fail: None)
35     defer.returnValue((getwork, height))
36
37 @deferral.retry('Error getting payout script from bitcoind:', 1)
38 @defer.inlineCallbacks
39 def get_payout_script(factory):
40     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
41     if res['reply'] == 'success':
42         my_script = res['script']
43     elif res['reply'] == 'denied':
44         my_script = None
45     else:
46         raise ValueError('Unexpected reply: %r' % (res,))
47
48 @deferral.retry('Error creating payout script:', 10)
49 @defer.inlineCallbacks
50 def get_payout_script2(bitcoind, net):
51     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
52
53 @defer.inlineCallbacks
54 def main(args):
55     try:
56         print 'p2pool (version %s)' % (p2pool_init.__version__,)
57         print
58         
59         # connect to bitcoind over JSON-RPC and do initial getwork
60         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
61         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
62         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
63         temp_work, temp_height = yield getwork(bitcoind)
64         print '    ...success!'
65         print '    Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
66         print
67         
68         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
69         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
70         factory = bitcoin.p2p.ClientFactory(args.net)
71         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
72         my_script = yield get_payout_script(factory)
73         if args.pubkey_hash is None:
74             if my_script is None:
75                 print 'IP transaction denied ... falling back to sending to address.'
76                 my_script = yield get_payout_script2(bitcoind, args.net)
77         else:
78             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
79         print '    ...success!'
80         print '    Payout script:', my_script.encode('hex')
81         print
82         
83         @defer.inlineCallbacks
84         def real_get_block(block_hash):
85             block = yield (yield factory.getProtocol()).get_block(block_hash)
86             print 'Got block %x' % (block_hash,)
87             defer.returnValue(block)
88         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
89         
90         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
91         
92         ht = bitcoin.p2p.HeightTracker(factory)
93         
94         tracker = p2pool.OkayTracker(args.net)
95         chains = expiring_dict.ExpiringDict(300)
96         def get_chain(chain_id_data):
97             return chains.setdefault(chain_id_data, Chain(chain_id_data))
98         
99         # information affecting work that should trigger a long-polling update
100         current_work = variable.Variable(None)
101         # information affecting work that should not trigger a long-polling update
102         current_work2 = variable.Variable(None)
103         
104         work_updated = variable.Event()
105         tracker_updated = variable.Event()
106         
107         requested = set()
108         task.LoopingCall(requested.clear).start(60)
109         
110         @defer.inlineCallbacks
111         def set_real_work1():
112             work, height = yield getwork(bitcoind)
113             # XXX call tracker_updated
114             current_work.set(dict(
115                 version=work.version,
116                 previous_block=work.previous_block,
117                 target=work.target,
118                 height=height,
119                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
120             ))
121             current_work2.set(dict(
122                 clock_offset=time.time() - work.timestamp,
123             ))
124         
125         @defer.inlineCallbacks
126         def set_real_work2():
127             best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
128             
129             t = dict(current_work.value)
130             t['best_share_hash'] = best
131             current_work.set(t)
132             
133             
134             
135             #if some_new:
136             #    share = shares[0]
137             #    
138             #    # instead, trigger main thread to call set_work
139             #    best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
140             #    
141             #    if best == share.hash:
142             #        print ('MINE: ' if peer is None else '') + 'Accepted share, new best, will pass to peers! Hash: %x' % (share.hash % 2**32,)
143             #    else:
144             #        print ('MINE: ' if peer is None else '') + 'Accepted share, not best. Hash: %x' % (share.hash % 2**32,)
145             #    
146             #    w = dict(current_work.value)
147             #    w['best_share_hash'] = best
148             #    current_work.set(w)
149             
150             for peer2, share_hash in desired:
151                 if peer2 is None:
152                     continue
153                 if (peer2.nonce, share_hash) in requested:
154                     continue
155                 print 'Requesting parent share %x' % (share_hash % 2**32,)
156                 peer2.send_getshares(
157                     hashes=[share_hash],
158                     parents=2000,
159                     stops=list(set(tracker.heads) | set(
160                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
161                     )),
162                 )
163                 requested.add((peer2.nonce, share_hash))
164         
165         print 'Initializing work...'
166         yield set_real_work1()
167         yield set_real_work2()
168         print '    ...success!'
169         
170         start_time = time.time() - current_work2.value['clock_offset']
171         
172         # setup p2p logic and join p2pool network
173         
174         def share_share(share, ignore_peer=None):
175             for peer in p2p_node.peers.itervalues():
176                 if peer is ignore_peer:
177                     continue
178                 peer.send_shares([share])
179             share.flag_shared()
180         
181         def p2p_shares(shares, peer=None):
182             if len(shares) > 5:
183                 print "Processing %i shares..." % (len(shares),)
184             
185             some_new = False
186             for share in shares:
187                 if share.hash in tracker.shares:
188                     #print 'Got duplicate share, ignoring. Hash: %x' % (share.hash % 2**32,)
189                     continue
190                 some_new = True
191                 
192                 #print 'Received share %x from %r' % (share.hash % 2**32, share.peer.transport.getPeer() if share.peer is not None else None)
193                 
194                 tracker.add(share)
195                 #for peer2, share_hash in desired:
196                 #    print 'Requesting parent share %x' % (share_hash,)
197                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
198                 
199                 if share.bitcoin_hash <= share.header['target']:
200                     print
201                     print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash % 2**32, share.bitcoin_hash,)
202                     print
203                     if factory.conn.value is not None:
204                         factory.conn.value.send_block(block=share.as_block(tracker, args.net))
205                     else:
206                         print 'No bitcoind connection! Erp!'
207             
208             if some_new:
209                 tracker_updated.happened()
210             
211             if len(shares) > 5:
212                 print "... done processing %i shares." % (len(shares),)
213         
214         def p2p_share_hashes(share_hashes, peer):
215             get_hashes = []
216             for share_hash in share_hashes:
217                 if share_hash in tracker.shares:
218                     pass # print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash % 2**32,)
219                 else:
220                     print 'Got share hash, requesting! Hash: %x' % (share_hash % 2**32,)
221                     get_hashes.append(share_hash)
222             if get_hashes:
223                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
224         
225         def p2p_get_shares(share_hashes, parents, stops, peer):
226             parents = min(parents, 1000//len(share_hashes))
227             stops = set(stops)
228             shares = []
229             for share_hash in share_hashes:
230                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
231                     if share.hash in stops:
232                         break
233                     shares.append(share)
234             peer.send_shares(shares, full=True)
235         
236         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
237         
238         def parse(x):
239             if ':' in x:
240                 ip, port = x.split(':')
241                 return ip, int(port)
242             else:
243                 return x, args.net.P2P_PORT
244         
245         nodes = [
246             ('72.14.191.28', args.net.P2P_PORT),
247             ('62.204.197.159', args.net.P2P_PORT),
248         ]
249         try:
250             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
251         except:
252             print
253             print 'Error resolving bootstrap node IP:'
254             log.err()
255             print
256         
257         p2p_node = p2p.Node(
258             current_work=current_work,
259             port=args.p2pool_port,
260             net=args.net,
261             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
262             mode=0 if args.low_bandwidth else 1,
263             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
264         )
265         p2p_node.handle_shares = p2p_shares
266         p2p_node.handle_share_hashes = p2p_share_hashes
267         p2p_node.handle_get_shares = p2p_get_shares
268         
269         p2p_node.start()
270         
271         # send share when the chain changes to their chain
272         def work_changed(new_work):
273             #print 'Work changed:', new_work
274             for share in tracker.get_chain_known(new_work['best_share_hash']):
275                 if share.shared:
276                     break
277                 share_share(share, share.peer)
278         current_work.changed.watch(work_changed)
279         
280         print '    ...success!'
281         print
282         
283         # start listening for workers with a JSON-RPC server
284         
285         print 'Listening for workers on port %i...' % (args.worker_port,)
286         
287         # setup worker logic
288         
289         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
290         run_identifier = struct.pack('<Q', random.randrange(2**64))
291         
292         def compute(state, all_targets):
293             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
294             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
295             extra_txs = []
296             size = 0
297             for tx in pre_extra_txs:
298                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
299                 if size + this_size > 500000:
300                     break
301                 extra_txs.append(tx)
302                 size += this_size
303             # XXX check sigops!
304             # XXX assuming generate_tx is smallish here..
305             generate_tx = p2pool.generate_transaction(
306                 tracker=tracker,
307                 previous_share_hash=state['best_share_hash'],
308                 new_script=my_script,
309                 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
310                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
311                 block_target=state['target'],
312                 net=args.net,
313             )
314             print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
315             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
316             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
317             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
318             merkle_root = bitcoin.data.merkle_hash(transactions)
319             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
320             
321             timestamp = int(time.time() - current_work2.value['clock_offset'])
322             if state['best_share_hash'] is not None:
323                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
324                 if timestamp2 > timestamp:
325                     print 'Toff', timestamp2 - timestamp
326                     timestamp = timestamp2
327             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
328             if not all_targets:
329                 target2 = min(2**256//2**32 - 1, target2)
330             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
331             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
332             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
333         
334         my_shares = set()
335         times = {}
336         
337         def got_response(data):
338             try:
339                 # match up with transactions
340                 header = bitcoin.getwork.decode_data(data)
341                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
342                 if transactions is None:
343                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
344                     return False
345                 block = dict(header=header, txs=transactions)
346                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
347                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
348                     print
349                     print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
350                     print
351                     if factory.conn.value is not None:
352                         factory.conn.value.send_block(block=block)
353                     else:
354                         print 'No bitcoind connection! Erp!'
355                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
356                 if hash_ > target:
357                     print 'Received invalid share from worker - %x/%x' % (hash_, target)
358                     return False
359                 share = p2pool.Share.from_block(block)
360                 my_shares.add(share.hash)
361                 print 'GOT SHARE! %x prev %x' % (share.hash % 2**32, 0 if share.previous_hash is None else share.previous_hash % 2**32), "DEAD ON ARRIVAL" if share.previous_hash != current_work.value['best_share_hash'] else "", time.time() - times[share.nonce], "s since getwork"
362                 good = share.previous_hash == current_work.value['best_share_hash']
363                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
364                 p2p_shares([share])
365                 # eg. good = share.hash == current_work.value['best_share_hash'] here
366                 return good
367             except:
368                 print
369                 print 'Error processing data received from worker:'
370                 log.err()
371                 print
372                 return False
373         
374         def get_rate():
375             if current_work.value['best_share_hash'] is not None:
376                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
377                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
378                 return att_s
379         
380         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
381         
382         print '    ...success!'
383         print
384         
385         # done!
386         
387         def get_blocks(start_hash):
388             while True:
389                 try:
390                     block = get_block.call_now(start_hash)
391                 except deferral.NotNowError:
392                     break
393                 yield start_hash, block
394                 start_hash = block['header']['previous_block']
395         
396         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
397         
398         class Tx(object):
399             def __init__(self, tx, seen_at_block):
400                 self.hash = bitcoin.data.tx_type.hash256(tx)
401                 self.tx = tx
402                 self.seen_at_block = seen_at_block
403                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
404                 #print
405                 #print '%x %r' % (seen_at_block, tx)
406                 #for mention in self.mentions:
407                 #    print '%x' % mention
408                 #print
409                 self.parents_all_in_blocks = False
410                 self.value_in = 0
411                 #print self.tx
412                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
413                 self._find_parents_in_blocks()
414             
415             @defer.inlineCallbacks
416             def _find_parents_in_blocks(self):
417                 for tx_in in self.tx['tx_ins']:
418                     try:
419                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
420                     except Exception:
421                         return
422                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
423                     #print raw_transaction
424                     if not raw_transaction['parent_blocks']:
425                         return
426                 self.parents_all_in_blocks = True
427             
428             def is_good(self):
429                 if not self.parents_all_in_blocks:
430                     return False
431                 x = self.is_good2()
432                 #print 'is_good:', x
433                 return x
434             
435             def is_good2(self):
436                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
437                     if block_hash == self.seen_at_block:
438                         return True
439                     for tx in block['txs']:
440                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins'] if tx_in['previous_output'] is not None])
441                         if mentions & self.mentions:
442                             return False
443                 return False
444         
445         @defer.inlineCallbacks
446         def new_tx(tx_hash):
447             try:
448                 assert isinstance(tx_hash, (int, long))
449                 #print "REQUESTING", tx_hash
450                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
451                 #print "GOT", tx
452                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
453             except:
454                 print
455                 print 'Error handling tx:'
456                 log.err()
457                 print
458         # disable for now, for testing impact on stales
459         #factory.new_tx.watch(new_tx)
460         
461         def new_block(block):
462             work_updated.happened()
463         factory.new_block.watch(new_block)
464         
465         print 'Started successfully!'
466         print
467         
468         @defer.inlineCallbacks
469         def work1_thread():
470             while True:
471                 flag = work_updated.get_deferred()
472                 try:
473                     yield set_real_work1()
474                 except:
475                     log.err()
476                 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
477         
478         
479         @defer.inlineCallbacks
480         def work2_thread():
481             while True:
482                 flag = tracker_updated.get_deferred()
483                 try:
484                     yield set_real_work2()
485                 except:
486                     log.err()
487                 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
488         
489         work1_thread()
490         work2_thread()
491         
492         counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
493         
494         while True:
495             yield deferral.sleep(random.expovariate(1/1))
496             try:
497                 if current_work.value['best_share_hash'] is not None:
498                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
499                     if height > 5:
500                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
501                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
502                         count = counter(current_work.value['best_share_hash'], height, 2**100)
503                         print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale)' % (
504                             math.format(att_s),
505                             height,
506                             weights.get(my_script, 0)/total_weight*100,
507                             math.format(weights.get(my_script, 0)/total_weight*att_s),
508                             len(my_shares),
509                             len(my_shares) - count,
510                         )
511                         #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
512                         #for k, v in weights.iteritems():
513                         #    print k.encode('hex'), v/total_weight
514             except:
515                 log.err()
516     except:
517         print
518         print 'Fatal error:'
519         log.err()
520         print
521         reactor.stop()
522
523 def run():
524     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
525     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
526     parser.add_argument('--testnet',
527         help='use the testnet',
528         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
529     parser.add_argument('--debug',
530         help='debugging mode',
531         action='store_const', const=True, default=False, dest='debug')
532     parser.add_argument('-a', '--address',
533         help='generate to this address (defaults to requesting one from bitcoind)',
534         type=str, action='store', default=None, dest='address')
535     
536     p2pool_group = parser.add_argument_group('p2pool interface')
537     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
538         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
539         type=int, action='store', default=None, dest='p2pool_port')
540     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
541         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
542         type=str, action='append', default=[], dest='p2pool_nodes')
543     parser.add_argument('-l', '--low-bandwidth',
544         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
545         action='store_true', default=False, dest='low_bandwidth')
546     
547     worker_group = parser.add_argument_group('worker interface')
548     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
549         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
550         type=int, action='store', default=9332, dest='worker_port')
551     
552     bitcoind_group = parser.add_argument_group('bitcoind interface')
553     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
554         help='connect to a bitcoind at this address (default: 127.0.0.1)',
555         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
556     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
557         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
558         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
559     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
560         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
561         type=int, action='store', default=None, dest='bitcoind_p2p_port')
562     
563     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
564         help='bitcoind RPC interface username',
565         type=str, action='store', dest='bitcoind_rpc_username')
566     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
567         help='bitcoind RPC interface password',
568         type=str, action='store', dest='bitcoind_rpc_password')
569     
570     args = parser.parse_args()
571     
572     if args.debug:
573         p2pool_init.DEBUG = True
574         class TimestampingPipe(object):
575             def __init__(self, inner_file):
576                 self.inner_file = inner_file
577                 self.buf = ""
578             def write(self, data):
579                 buf = self.buf + data
580                 lines = buf.split('\n')
581                 for line in lines[:-1]:
582                     self.inner_file.write("%s %s\n" % (time.strftime("%H:%M:%S"), line))
583                 self.buf = lines[-1]
584         sys.stdout = TimestampingPipe(sys.stdout)
585         sys.stderr = TimestampingPipe(sys.stderr)
586     
587     if args.bitcoind_p2p_port is None:
588         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
589     
590     if args.p2pool_port is None:
591         args.p2pool_port = args.net.P2P_PORT
592     
593     if args.address is not None:
594         try:
595             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
596         except Exception, e:
597             raise ValueError("error parsing address: " + repr(e))
598     else:
599         args.pubkey_hash = None
600     
601     reactor.callWhenRunning(main, args)
602     reactor.run()