display share count while downloading
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14
15 from twisted.internet import defer, reactor
16 from twisted.web import server
17 from twisted.python import log
18
19 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
20 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
21 from . import p2p, worker_interface
22 import p2pool.data as p2pool
23 import p2pool as p2pool_init
24
25 @deferral.retry('Error getting work from bitcoind:', 3)
26 @defer.inlineCallbacks
27 def getwork(bitcoind):
28     # a block could arrive in between these two queries
29     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
30     try:
31         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
32     finally:
33         # get rid of residual errors
34         getwork_df.addErrback(lambda fail: None)
35         height_df.addErrback(lambda fail: None)
36     defer.returnValue((getwork, height))
37
38 @deferral.retry('Error getting payout script from bitcoind:', 1)
39 @defer.inlineCallbacks
40 def get_payout_script(factory):
41     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
42     if res['reply'] == 'success':
43         my_script = res['script']
44     elif res['reply'] == 'denied':
45         my_script = None
46     else:
47         raise ValueError('Unexpected reply: %r' % (res,))
48
49 @deferral.retry('Error creating payout script:', 10)
50 @defer.inlineCallbacks
51 def get_payout_script2(bitcoind, net):
52     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
53
54 @defer.inlineCallbacks
55 def main(args):
56     try:
57         print 'p2pool (version %s)' % (p2pool_init.__version__,)
58         print
59         
60         # connect to bitcoind over JSON-RPC and do initial getwork
61         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
62         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
63         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
64         temp_work, temp_height = yield getwork(bitcoind)
65         print '    ...success!'
66         print '    Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
67         print
68         
69         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
70         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
71         factory = bitcoin.p2p.ClientFactory(args.net)
72         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
73         my_script = yield get_payout_script(factory)
74         if args.pubkey_hash is None:
75             if my_script is None:
76                 print 'IP transaction denied ... falling back to sending to address.'
77                 my_script = yield get_payout_script2(bitcoind, args.net)
78         else:
79             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
80         print '    ...success!'
81         print '    Payout script:', my_script.encode('hex')
82         print
83         
84         ht = bitcoin.p2p.HeightTracker(factory)
85         
86         tracker = p2pool.OkayTracker(args.net)
87         chains = expiring_dict.ExpiringDict(300)
88         def get_chain(chain_id_data):
89             return chains.setdefault(chain_id_data, Chain(chain_id_data))
90         
91         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
92         
93         # information affecting work that should trigger a long-polling update
94         current_work = variable.Variable(None)
95         # information affecting work that should not trigger a long-polling update
96         current_work2 = variable.Variable(None)
97         
98         work_updated = variable.Event()
99         tracker_updated = variable.Event()
100         
101         requested = expiring_dict.ExpiringDict(300)
102         
103         @defer.inlineCallbacks
104         def set_real_work1():
105             work, height = yield getwork(bitcoind)
106             # XXX call tracker_updated
107             current_work.set(dict(
108                 version=work.version,
109                 previous_block=work.previous_block,
110                 target=work.target,
111                 height=height,
112                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
113             ))
114             current_work2.set(dict(
115                 clock_offset=time.time() - work.timestamp,
116             ))
117         
118         @defer.inlineCallbacks
119         def set_real_work2():
120             best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
121             
122             t = dict(current_work.value)
123             t['best_share_hash'] = best
124             current_work.set(t)
125             
126             for peer2, share_hash in desired:
127                 last_request_time, count = requested.get(share_hash, (None, 0))
128                 if last_request_time is not None and last_request_time - 5 < time.time() < last_request_time + 10 * 1.5**count:
129                     continue
130                 potential_peers = set()
131                 for head in tracker.tails[share_hash]:
132                     potential_peers.update(peer_heads.get(head, set()))
133                 potential_peers = [peer for peer in potential_peers if peer.connected2]
134                 if count == 0 and peer2 is not None and peer2.connected2:
135                     peer = peer2
136                 else:
137                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
138                     if peer is None:
139                         continue
140                 
141                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
142                 peer.send_getshares(
143                     hashes=[share_hash],
144                     parents=2000,
145                     stops=list(set(tracker.heads) | set(
146                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
147                     )),
148                 )
149                 requested[share_hash] = time.time(), count + 1
150         
151         print 'Initializing work...'
152         yield set_real_work1()
153         yield set_real_work2()
154         print '    ...success!'
155         
156         start_time = time.time() - current_work2.value['clock_offset']
157         
158         # setup p2p logic and join p2pool network
159         
160         def share_share(share, ignore_peer=None):
161             for peer in p2p_node.peers.itervalues():
162                 if peer is ignore_peer:
163                     continue
164                 peer.send_shares([share])
165             share.flag_shared()
166         
167         def p2p_shares(shares, peer=None):
168             if len(shares) > 5:
169                 print 'Processing %i shares...' % (len(shares),)
170             
171             some_new = False
172             for share in shares:
173                 if share.hash in tracker.shares:
174                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
175                     continue
176                 some_new = True
177                 
178                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
179                 
180                 tracker.add(share)
181                 #for peer2, share_hash in desired:
182                 #    print 'Requesting parent share %x' % (share_hash,)
183                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
184                 
185                 if share.bitcoin_hash <= share.header['target']:
186                     print
187                     print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
188                     print
189                     if factory.conn.value is not None:
190                         factory.conn.value.send_block(block=share.as_block(tracker, args.net))
191                     else:
192                         print 'No bitcoind connection! Erp!'
193             
194             if shares and peer is not None:
195                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
196             
197             if some_new:
198                 tracker_updated.happened()
199             
200             if len(shares) > 5:
201                 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
202         
203         def p2p_share_hashes(share_hashes, peer):
204             get_hashes = []
205             for share_hash in share_hashes:
206                 if share_hash in tracker.shares:
207                     pass # print 'Got share hash, already have, ignoring. Hash: %s' % (p2pool.format_hash(share_hash),)
208                 else:
209                     print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
210                     get_hashes.append(share_hash)
211             
212             if share_hashes and peer is not None:
213                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
214             if get_hashes:
215                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
216         
217         def p2p_get_shares(share_hashes, parents, stops, peer):
218             parents = min(parents, 1000//len(share_hashes))
219             stops = set(stops)
220             shares = []
221             for share_hash in share_hashes:
222                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
223                     if share.hash in stops:
224                         break
225                     shares.append(share)
226             peer.send_shares(shares, full=True)
227         
228         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
229         
230         def parse(x):
231             if ':' in x:
232                 ip, port = x.split(':')
233                 return ip, int(port)
234             else:
235                 return x, args.net.P2P_PORT
236         
237         nodes = [
238             ('72.14.191.28', args.net.P2P_PORT),
239             ('62.204.197.159', args.net.P2P_PORT),
240         ]
241         try:
242             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
243         except:
244             log.err(None, 'Error resolving bootstrap node IP:')
245         
246         p2p_node = p2p.Node(
247             current_work=current_work,
248             port=args.p2pool_port,
249             net=args.net,
250             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
251             mode=0 if args.low_bandwidth else 1,
252             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
253         )
254         p2p_node.handle_shares = p2p_shares
255         p2p_node.handle_share_hashes = p2p_share_hashes
256         p2p_node.handle_get_shares = p2p_get_shares
257         
258         p2p_node.start()
259         
260         # send share when the chain changes to their chain
261         def work_changed(new_work):
262             #print 'Work changed:', new_work
263             for share in tracker.get_chain_known(new_work['best_share_hash']):
264                 if share.shared:
265                     break
266                 share_share(share, share.peer)
267         current_work.changed.watch(work_changed)
268         
269         print '    ...success!'
270         print
271         
272         # start listening for workers with a JSON-RPC server
273         
274         print 'Listening for workers on port %i...' % (args.worker_port,)
275         
276         # setup worker logic
277         
278         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
279         run_identifier = struct.pack('<Q', random.randrange(2**64))
280         
281         def compute(state, all_targets):
282             if state['best_share_hash'] is None:
283                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
284             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
285             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
286             extra_txs = []
287             size = 0
288             for tx in pre_extra_txs:
289                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
290                 if size + this_size > 500000:
291                     break
292                 extra_txs.append(tx)
293                 size += this_size
294             # XXX check sigops!
295             # XXX assuming generate_tx is smallish here..
296             generate_tx = p2pool.generate_transaction(
297                 tracker=tracker,
298                 previous_share_hash=state['best_share_hash'],
299                 new_script=my_script,
300                 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
301                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
302                 block_target=state['target'],
303                 net=args.net,
304             )
305             print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
306             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
307             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
308             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
309             merkle_root = bitcoin.data.merkle_hash(transactions)
310             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
311             
312             timestamp = int(time.time() - current_work2.value['clock_offset'])
313             if state['best_share_hash'] is not None:
314                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
315                 if timestamp2 > timestamp:
316                     print 'Toff', timestamp2 - timestamp
317                     timestamp = timestamp2
318             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
319             if not all_targets:
320                 target2 = min(2**256//2**32 - 1, target2)
321             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
322             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
323             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
324         
325         my_shares = set()
326         times = {}
327         
328         def got_response(data):
329             try:
330                 # match up with transactions
331                 header = bitcoin.getwork.decode_data(data)
332                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
333                 if transactions is None:
334                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
335                     return False
336                 block = dict(header=header, txs=transactions)
337                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
338                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
339                     print
340                     print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
341                     print
342                     if factory.conn.value is not None:
343                         factory.conn.value.send_block(block=block)
344                     else:
345                         print 'No bitcoind connection! Erp!'
346                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
347                 if hash_ > target:
348                     print 'Received invalid share from worker - %x/%x' % (hash_, target)
349                     return False
350                 share = p2pool.Share.from_block(block)
351                 my_shares.add(share.hash)
352                 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
353                 good = share.previous_hash == current_work.value['best_share_hash']
354                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
355                 p2p_shares([share])
356                 # eg. good = share.hash == current_work.value['best_share_hash'] here
357                 return good
358             except:
359                 log.err(None, 'Error processing data received from worker:')
360                 return False
361         
362         def get_rate():
363             if current_work.value['best_share_hash'] is not None:
364                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
365                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
366                 return att_s
367         
368         def get_users():
369             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
370             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 1000), 2**100)
371             res = {}
372             for script in sorted(weights, key=lambda s: weights[s]):
373                 res[script.encode('hex')] = weights[script]/total_weight
374             return res
375
376         
377         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate, get_users)))
378         
379         print '    ...success!'
380         print
381         
382         # done!
383         
384         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
385         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
386         
387         class Tx(object):
388             def __init__(self, tx, seen_at_block):
389                 self.hash = bitcoin.data.tx_type.hash256(tx)
390                 self.tx = tx
391                 self.seen_at_block = seen_at_block
392                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
393                 #print
394                 #print '%x %r' % (seen_at_block, tx)
395                 #for mention in self.mentions:
396                 #    print '%x' % mention
397                 #print
398                 self.parents_all_in_blocks = False
399                 self.value_in = 0
400                 #print self.tx
401                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
402                 self._find_parents_in_blocks()
403             
404             @defer.inlineCallbacks
405             def _find_parents_in_blocks(self):
406                 for tx_in in self.tx['tx_ins']:
407                     try:
408                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
409                     except Exception:
410                         return
411                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
412                     #print raw_transaction
413                     if not raw_transaction['parent_blocks']:
414                         return
415                 self.parents_all_in_blocks = True
416             
417             def is_good(self):
418                 if not self.parents_all_in_blocks:
419                     return False
420                 x = self.is_good2()
421                 #print 'is_good:', x
422                 return x
423         
424         @defer.inlineCallbacks
425         def new_tx(tx_hash):
426             try:
427                 assert isinstance(tx_hash, (int, long))
428                 #print 'REQUESTING', tx_hash
429                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
430                 #print 'GOT', tx
431                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
432             except:
433                 log.err(None, 'Error handling tx:')
434         # disable for now, for testing impact on stales
435         #factory.new_tx.watch(new_tx)
436         
437         def new_block(block_hash):
438             work_updated.happened()
439         factory.new_block.watch(new_block)
440         
441         print 'Started successfully!'
442         print
443         
444         @defer.inlineCallbacks
445         def work1_thread():
446             while True:
447                 flag = work_updated.get_deferred()
448                 try:
449                     yield set_real_work1()
450                 except:
451                     log.err()
452                 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
453         
454         @defer.inlineCallbacks
455         def work2_thread():
456             while True:
457                 flag = tracker_updated.get_deferred()
458                 try:
459                     yield set_real_work2()
460                 except:
461                     log.err()
462                 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
463         
464         work1_thread()
465         work2_thread()
466         
467         counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
468         
469         while True:
470             yield deferral.sleep(random.expovariate(1/1))
471             try:
472                 if current_work.value['best_share_hash'] is not None:
473                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
474                     if height > 5:
475                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
476                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
477                         count = counter(current_work.value['best_share_hash'], height, 2**100)
478                         print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale)' % (
479                             math.format(att_s),
480                             height,
481                             weights.get(my_script, 0)/total_weight*100,
482                             math.format(weights.get(my_script, 0)/total_weight*att_s),
483                             len(my_shares),
484                             len(my_shares) - count,
485                         )
486                         #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
487                         #for k, v in weights.iteritems():
488                         #    print k.encode('hex'), v/total_weight
489             except:
490                 log.err()
491     except:
492         log.err(None, 'Fatal error:')
493         reactor.stop()
494
495 def run():
496     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
497     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
498     parser.add_argument('--testnet',
499         help='use the testnet',
500         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
501     parser.add_argument('--debug',
502         help='debugging mode',
503         action='store_const', const=True, default=False, dest='debug')
504     parser.add_argument('-a', '--address',
505         help='generate to this address (defaults to requesting one from bitcoind)',
506         type=str, action='store', default=None, dest='address')
507     
508     p2pool_group = parser.add_argument_group('p2pool interface')
509     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
510         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
511         type=int, action='store', default=None, dest='p2pool_port')
512     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
513         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
514         type=str, action='append', default=[], dest='p2pool_nodes')
515     parser.add_argument('-l', '--low-bandwidth',
516         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
517         action='store_true', default=False, dest='low_bandwidth')
518     
519     worker_group = parser.add_argument_group('worker interface')
520     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
521         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
522         type=int, action='store', default=9332, dest='worker_port')
523     
524     bitcoind_group = parser.add_argument_group('bitcoind interface')
525     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
526         help='connect to a bitcoind at this address (default: 127.0.0.1)',
527         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
528     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
529         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
530         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
531     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
532         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
533         type=int, action='store', default=None, dest='bitcoind_p2p_port')
534     
535     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
536         help='bitcoind RPC interface username',
537         type=str, action='store', dest='bitcoind_rpc_username')
538     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
539         help='bitcoind RPC interface password',
540         type=str, action='store', dest='bitcoind_rpc_password')
541     
542     args = parser.parse_args()
543     
544     if args.debug:
545         p2pool_init.DEBUG = True
546         class TimestampingPipe(object):
547             def __init__(self, inner_file):
548                 self.inner_file = inner_file
549                 self.buf = ''
550                 self.softspace = 0
551             def write(self, data):
552                 buf = self.buf + data
553                 lines = buf.split('\n')
554                 for line in lines[:-1]:
555                     self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
556                 self.buf = lines[-1]
557             def flush(self):
558                 self.inner_file.flush()
559         sys.stdout = TimestampingPipe(sys.stdout)
560         sys.stderr = TimestampingPipe(sys.stderr)
561         log.DefaultObserver.stderr = sys.stderr
562     
563     if args.bitcoind_p2p_port is None:
564         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
565     
566     if args.p2pool_port is None:
567         args.p2pool_port = args.net.P2P_PORT
568     
569     if args.address is not None:
570         try:
571             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
572         except Exception, e:
573             raise ValueError('error parsing address: ' + repr(e))
574     else:
575         args.pubkey_hash = None
576     
577     reactor.callWhenRunning(main, args)
578     reactor.run()