users request
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import sys
12 import time
13
14 from twisted.internet import defer, reactor
15 from twisted.web import server
16 from twisted.python import log
17
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22 import p2pool as p2pool_init
23
24 @deferral.retry('Error getting work from bitcoind:', 3)
25 @defer.inlineCallbacks
26 def getwork(bitcoind):
27     # a block could arrive in between these two queries
28     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
29     try:
30         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
31     finally:
32         # get rid of residual errors
33         getwork_df.addErrback(lambda fail: None)
34         height_df.addErrback(lambda fail: None)
35     defer.returnValue((getwork, height))
36
37 @deferral.retry('Error getting payout script from bitcoind:', 1)
38 @defer.inlineCallbacks
39 def get_payout_script(factory):
40     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
41     if res['reply'] == 'success':
42         my_script = res['script']
43     elif res['reply'] == 'denied':
44         my_script = None
45     else:
46         raise ValueError('Unexpected reply: %r' % (res,))
47
48 @deferral.retry('Error creating payout script:', 10)
49 @defer.inlineCallbacks
50 def get_payout_script2(bitcoind, net):
51     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
52
53 @defer.inlineCallbacks
54 def main(args):
55     try:
56         print 'p2pool (version %s)' % (p2pool_init.__version__,)
57         print
58         
59         # connect to bitcoind over JSON-RPC and do initial getwork
60         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
61         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
62         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
63         temp_work, temp_height = yield getwork(bitcoind)
64         print '    ...success!'
65         print '    Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
66         print
67         
68         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
69         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
70         factory = bitcoin.p2p.ClientFactory(args.net)
71         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
72         my_script = yield get_payout_script(factory)
73         if args.pubkey_hash is None:
74             if my_script is None:
75                 print 'IP transaction denied ... falling back to sending to address.'
76                 my_script = yield get_payout_script2(bitcoind, args.net)
77         else:
78             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
79         print '    ...success!'
80         print '    Payout script:', my_script.encode('hex')
81         print
82         
83         ht = bitcoin.p2p.HeightTracker(factory)
84         
85         tracker = p2pool.OkayTracker(args.net)
86         chains = expiring_dict.ExpiringDict(300)
87         def get_chain(chain_id_data):
88             return chains.setdefault(chain_id_data, Chain(chain_id_data))
89         
90         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
91         
92         # information affecting work that should trigger a long-polling update
93         current_work = variable.Variable(None)
94         # information affecting work that should not trigger a long-polling update
95         current_work2 = variable.Variable(None)
96         
97         work_updated = variable.Event()
98         tracker_updated = variable.Event()
99         
100         requested = expiring_dict.ExpiringDict(300)
101         
102         @defer.inlineCallbacks
103         def set_real_work1():
104             work, height = yield getwork(bitcoind)
105             # XXX call tracker_updated
106             current_work.set(dict(
107                 version=work.version,
108                 previous_block=work.previous_block,
109                 target=work.target,
110                 height=height,
111                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
112             ))
113             current_work2.set(dict(
114                 clock_offset=time.time() - work.timestamp,
115             ))
116         
117         @defer.inlineCallbacks
118         def set_real_work2():
119             best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
120             
121             t = dict(current_work.value)
122             t['best_share_hash'] = best
123             current_work.set(t)
124             
125             for peer2, share_hash in desired:
126                 last_request_time, count = requested.get(share_hash, (None, 0))
127                 if last_request_time is not None and last_request_time - 5 < time.time() < last_request_time + 10 * 1.5**count:
128                     continue
129                 potential_peers = set()
130                 for head in tracker.tails[share_hash]:
131                     potential_peers.update(peer_heads.get(head, set()))
132                 potential_peers = [peer for peer in potential_peers if peer.connected2]
133                 if count == 0 and peer2 is not None and peer2.connected2:
134                     peer = peer2
135                 else:
136                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
137                     if peer is None:
138                         continue
139                 
140                 print 'Requesting parent share %x from %s' % (share_hash % 2**32, '%s:%i' % peer.addr)
141                 peer.send_getshares(
142                     hashes=[share_hash],
143                     parents=2000,
144                     stops=list(set(tracker.heads) | set(
145                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
146                     )),
147                 )
148                 requested[share_hash] = time.time(), count + 1
149         
150         print 'Initializing work...'
151         yield set_real_work1()
152         yield set_real_work2()
153         print '    ...success!'
154         
155         start_time = time.time() - current_work2.value['clock_offset']
156         
157         # setup p2p logic and join p2pool network
158         
159         def share_share(share, ignore_peer=None):
160             for peer in p2p_node.peers.itervalues():
161                 if peer is ignore_peer:
162                     continue
163                 peer.send_shares([share])
164             share.flag_shared()
165         
166         def p2p_shares(shares, peer=None):
167             if len(shares) > 5:
168                 print 'Processing %i shares...' % (len(shares),)
169             
170             some_new = False
171             for share in shares:
172                 if share.hash in tracker.shares:
173                     #print 'Got duplicate share, ignoring. Hash: %x' % (share.hash % 2**32,)
174                     continue
175                 some_new = True
176                 
177                 #print 'Received share %x from %r' % (share.hash % 2**32, share.peer.addr if share.peer is not None else None)
178                 
179                 tracker.add(share)
180                 #for peer2, share_hash in desired:
181                 #    print 'Requesting parent share %x' % (share_hash,)
182                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
183                 
184                 if share.bitcoin_hash <= share.header['target']:
185                     print
186                     print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash % 2**32, share.bitcoin_hash,)
187                     print
188                     if factory.conn.value is not None:
189                         factory.conn.value.send_block(block=share.as_block(tracker, args.net))
190                     else:
191                         print 'No bitcoind connection! Erp!'
192             
193             if shares and peer is not None:
194                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
195             
196             if some_new:
197                 tracker_updated.happened()
198             
199             if len(shares) > 5:
200                 print '... done processing %i shares.' % (len(shares),)
201         
202         def p2p_share_hashes(share_hashes, peer):
203             get_hashes = []
204             for share_hash in share_hashes:
205                 if share_hash in tracker.shares:
206                     pass # print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash % 2**32,)
207                 else:
208                     print 'Got share hash, requesting! Hash: %x' % (share_hash % 2**32,)
209                     get_hashes.append(share_hash)
210             
211             if share_hashes and peer is not None:
212                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
213             if get_hashes:
214                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
215         
216         def p2p_get_shares(share_hashes, parents, stops, peer):
217             parents = min(parents, 1000//len(share_hashes))
218             stops = set(stops)
219             shares = []
220             for share_hash in share_hashes:
221                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
222                     if share.hash in stops:
223                         break
224                     shares.append(share)
225             peer.send_shares(shares, full=True)
226         
227         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
228         
229         def parse(x):
230             if ':' in x:
231                 ip, port = x.split(':')
232                 return ip, int(port)
233             else:
234                 return x, args.net.P2P_PORT
235         
236         nodes = [
237             ('72.14.191.28', args.net.P2P_PORT),
238             ('62.204.197.159', args.net.P2P_PORT),
239         ]
240         try:
241             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
242         except:
243             log.err(None, 'Error resolving bootstrap node IP:')
244         
245         p2p_node = p2p.Node(
246             current_work=current_work,
247             port=args.p2pool_port,
248             net=args.net,
249             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
250             mode=0 if args.low_bandwidth else 1,
251             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
252         )
253         p2p_node.handle_shares = p2p_shares
254         p2p_node.handle_share_hashes = p2p_share_hashes
255         p2p_node.handle_get_shares = p2p_get_shares
256         
257         p2p_node.start()
258         
259         # send share when the chain changes to their chain
260         def work_changed(new_work):
261             #print 'Work changed:', new_work
262             for share in tracker.get_chain_known(new_work['best_share_hash']):
263                 if share.shared:
264                     break
265                 share_share(share, share.peer)
266         current_work.changed.watch(work_changed)
267         
268         print '    ...success!'
269         print
270         
271         # start listening for workers with a JSON-RPC server
272         
273         print 'Listening for workers on port %i...' % (args.worker_port,)
274         
275         # setup worker logic
276         
277         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
278         run_identifier = struct.pack('<Q', random.randrange(2**64))
279         
280         def compute(state, all_targets):
281             if state['best_share_hash'] is None:
282                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
283             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
284             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
285             extra_txs = []
286             size = 0
287             for tx in pre_extra_txs:
288                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
289                 if size + this_size > 500000:
290                     break
291                 extra_txs.append(tx)
292                 size += this_size
293             # XXX check sigops!
294             # XXX assuming generate_tx is smallish here..
295             generate_tx = p2pool.generate_transaction(
296                 tracker=tracker,
297                 previous_share_hash=state['best_share_hash'],
298                 new_script=my_script,
299                 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
300                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
301                 block_target=state['target'],
302                 net=args.net,
303             )
304             print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
305             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
306             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
307             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
308             merkle_root = bitcoin.data.merkle_hash(transactions)
309             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
310             
311             timestamp = int(time.time() - current_work2.value['clock_offset'])
312             if state['best_share_hash'] is not None:
313                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
314                 if timestamp2 > timestamp:
315                     print 'Toff', timestamp2 - timestamp
316                     timestamp = timestamp2
317             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
318             if not all_targets:
319                 target2 = min(2**256//2**32 - 1, target2)
320             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
321             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
322             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
323         
324         my_shares = set()
325         times = {}
326         
327         def got_response(data):
328             try:
329                 # match up with transactions
330                 header = bitcoin.getwork.decode_data(data)
331                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
332                 if transactions is None:
333                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
334                     return False
335                 block = dict(header=header, txs=transactions)
336                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
337                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
338                     print
339                     print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
340                     print
341                     if factory.conn.value is not None:
342                         factory.conn.value.send_block(block=block)
343                     else:
344                         print 'No bitcoind connection! Erp!'
345                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
346                 if hash_ > target:
347                     print 'Received invalid share from worker - %x/%x' % (hash_, target)
348                     return False
349                 share = p2pool.Share.from_block(block)
350                 my_shares.add(share.hash)
351                 print 'GOT SHARE! %x prev %x age %.2fs' % (share.hash % 2**32, 0 if share.previous_hash is None else share.previous_hash % 2**32, time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
352                 good = share.previous_hash == current_work.value['best_share_hash']
353                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
354                 p2p_shares([share])
355                 # eg. good = share.hash == current_work.value['best_share_hash'] here
356                 return good
357             except:
358                 log.err(None, 'Error processing data received from worker:')
359                 return False
360         
361         def get_rate():
362             if current_work.value['best_share_hash'] is not None:
363                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
364                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
365                 return att_s
366         
367         def get_users():
368             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
369             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 1000), 2**100)
370             res = {}
371             for script in sorted(weights, key=lambda s: weights[s]):
372                 res[script.encode('hex')] = weights[script]/total_weight
373             return res
374
375         
376         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate, get_users)))
377         
378         print '    ...success!'
379         print
380         
381         # done!
382         
383         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
384         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
385         
386         class Tx(object):
387             def __init__(self, tx, seen_at_block):
388                 self.hash = bitcoin.data.tx_type.hash256(tx)
389                 self.tx = tx
390                 self.seen_at_block = seen_at_block
391                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
392                 #print
393                 #print '%x %r' % (seen_at_block, tx)
394                 #for mention in self.mentions:
395                 #    print '%x' % mention
396                 #print
397                 self.parents_all_in_blocks = False
398                 self.value_in = 0
399                 #print self.tx
400                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
401                 self._find_parents_in_blocks()
402             
403             @defer.inlineCallbacks
404             def _find_parents_in_blocks(self):
405                 for tx_in in self.tx['tx_ins']:
406                     try:
407                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
408                     except Exception:
409                         return
410                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
411                     #print raw_transaction
412                     if not raw_transaction['parent_blocks']:
413                         return
414                 self.parents_all_in_blocks = True
415             
416             def is_good(self):
417                 if not self.parents_all_in_blocks:
418                     return False
419                 x = self.is_good2()
420                 #print 'is_good:', x
421                 return x
422         
423         @defer.inlineCallbacks
424         def new_tx(tx_hash):
425             try:
426                 assert isinstance(tx_hash, (int, long))
427                 #print 'REQUESTING', tx_hash
428                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
429                 #print 'GOT', tx
430                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
431             except:
432                 log.err(None, 'Error handling tx:')
433         # disable for now, for testing impact on stales
434         #factory.new_tx.watch(new_tx)
435         
436         def new_block(block_hash):
437             work_updated.happened()
438         factory.new_block.watch(new_block)
439         
440         print 'Started successfully!'
441         print
442         
443         @defer.inlineCallbacks
444         def work1_thread():
445             while True:
446                 flag = work_updated.get_deferred()
447                 try:
448                     yield set_real_work1()
449                 except:
450                     log.err()
451                 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
452         
453         @defer.inlineCallbacks
454         def work2_thread():
455             while True:
456                 flag = tracker_updated.get_deferred()
457                 try:
458                     yield set_real_work2()
459                 except:
460                     log.err()
461                 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
462         
463         work1_thread()
464         work2_thread()
465         
466         counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
467         
468         while True:
469             yield deferral.sleep(random.expovariate(1/1))
470             try:
471                 if current_work.value['best_share_hash'] is not None:
472                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
473                     if height > 5:
474                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
475                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
476                         count = counter(current_work.value['best_share_hash'], height, 2**100)
477                         print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale)' % (
478                             math.format(att_s),
479                             height,
480                             weights.get(my_script, 0)/total_weight*100,
481                             math.format(weights.get(my_script, 0)/total_weight*att_s),
482                             len(my_shares),
483                             len(my_shares) - count,
484                         )
485                         #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
486                         #for k, v in weights.iteritems():
487                         #    print k.encode('hex'), v/total_weight
488             except:
489                 log.err()
490     except:
491         log.err(None, 'Fatal error:')
492         reactor.stop()
493
494 def run():
495     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
496     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
497     parser.add_argument('--testnet',
498         help='use the testnet',
499         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
500     parser.add_argument('--debug',
501         help='debugging mode',
502         action='store_const', const=True, default=False, dest='debug')
503     parser.add_argument('-a', '--address',
504         help='generate to this address (defaults to requesting one from bitcoind)',
505         type=str, action='store', default=None, dest='address')
506     
507     p2pool_group = parser.add_argument_group('p2pool interface')
508     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
509         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
510         type=int, action='store', default=None, dest='p2pool_port')
511     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
512         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
513         type=str, action='append', default=[], dest='p2pool_nodes')
514     parser.add_argument('-l', '--low-bandwidth',
515         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
516         action='store_true', default=False, dest='low_bandwidth')
517     
518     worker_group = parser.add_argument_group('worker interface')
519     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
520         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
521         type=int, action='store', default=9332, dest='worker_port')
522     
523     bitcoind_group = parser.add_argument_group('bitcoind interface')
524     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
525         help='connect to a bitcoind at this address (default: 127.0.0.1)',
526         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
527     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
528         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
529         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
530     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
531         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
532         type=int, action='store', default=None, dest='bitcoind_p2p_port')
533     
534     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
535         help='bitcoind RPC interface username',
536         type=str, action='store', dest='bitcoind_rpc_username')
537     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
538         help='bitcoind RPC interface password',
539         type=str, action='store', dest='bitcoind_rpc_password')
540     
541     args = parser.parse_args()
542     
543     if args.debug:
544         p2pool_init.DEBUG = True
545         class TimestampingPipe(object):
546             def __init__(self, inner_file):
547                 self.inner_file = inner_file
548                 self.buf = ''
549                 self.softspace = 0
550             def write(self, data):
551                 buf = self.buf + data
552                 lines = buf.split('\n')
553                 for line in lines[:-1]:
554                     self.inner_file.write('%s %s\n' % (time.strftime('%H:%M:%S'), line))
555                 self.buf = lines[-1]
556             def flush(self):
557                 self.inner_file.flush()
558         sys.stdout = TimestampingPipe(sys.stdout)
559         sys.stderr = TimestampingPipe(sys.stderr)
560         log.DefaultObserver.stderr = sys.stderr
561     
562     if args.bitcoind_p2p_port is None:
563         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
564     
565     if args.p2pool_port is None:
566         args.p2pool_port = args.net.P2P_PORT
567     
568     if args.address is not None:
569         try:
570             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
571         except Exception, e:
572             raise ValueError('error parsing address: ' + repr(e))
573     else:
574         args.pubkey_hash = None
575     
576     reactor.callWhenRunning(main, args)
577     reactor.run()