Skip to content
This repository has been archived by the owner on Mar 2, 2022. It is now read-only.

Make code PEP8 compliant #73

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions bwscanner/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,22 @@ def write_aggregate_data(tor, scan_dirs, file_name="aggregate_measurements"):
for relay_fp in measurements.keys():
log.debug("Aggregating measurements for {relay}", relay=relay_fp)

mean_bw = int(sum(measurements[relay_fp]) // len(measurements[relay_fp]))
mean_bw = int(sum(measurements[relay_fp]) //
len(measurements[relay_fp]))

# Calculated the "filtered bandwidth"
filtered_bws = [bw for bw in measurements[relay_fp] if bw >= mean_bw]
if filtered_bws:
mean_filtered_bw = int(sum(filtered_bws) // len(filtered_bws))
if not filtered_bws or mean_filtered_bw <= 0:
log.debug("Could not calculate a valid filtered bandwidth, skipping relay.")
log.debug("Could not calculate a valid filtered bandwidth, "
"skipping relay.")
continue

routerstatus_info = yield tor.protocol.get_info_raw('ns/id/' + relay_fp.lstrip("$"))
descriptor_info = yield tor.protocol.get_info_raw('desc/id/' + relay_fp.lstrip("$"))
routerstatus_info = yield tor.protocol.get_info_raw(
'ns/id/' + relay_fp.lstrip("$"))
descriptor_info = yield tor.protocol.get_info_raw(
'desc/id/' + relay_fp.lstrip("$"))
relay_routerstatus = RouterStatusEntryV3(routerstatus_info)
relay_descriptor = RelayDescriptor(descriptor_info)

Expand All @@ -79,15 +83,17 @@ def write_aggregate_data(tor, scan_dirs, file_name="aggregate_measurements"):
num_measurements = len(measurements[relay_fp])
circ_fail_rate = num_failures / (num_measurements + num_failures)
else:
log.debug("Not enough measurements to calculate the circuit fail rate.")
log.debug(
"Not enough measurements to calculate the circuit fail rate.")
circ_fail_rate = 0.0

desc_bw = relay_descriptor.average_bandwidth
line_format = ("node_id={} nick={} strm_bw={} filt_bw={} circ_fail_rate={} "
"desc_bw={} ns_bw={}\n")
line_format = ("node_id={} nick={} strm_bw={} filt_bw={} "
"circ_fail_rate={} desc_bw={} ns_bw={}\n")

aggregate_file.write(line_format.format(relay_fp, nickname, mean_bw, mean_filtered_bw,
circ_fail_rate, desc_bw, ns_bw))
aggregate_file.write(line_format.format(
relay_fp, nickname, mean_bw, mean_filtered_bw,
circ_fail_rate, desc_bw, ns_bw))

aggregate_file.close()
log.info("Finished outputting the aggregated measurements to {file}.",
Expand Down
16 changes: 11 additions & 5 deletions bwscanner/attacher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import txtorcon
from twisted.internet import defer, reactor, endpoints
from txtorcon.interface import CircuitListenerMixin, IStreamAttacher, StreamListenerMixin
from txtorcon.interface import (CircuitListenerMixin, IStreamAttacher,
StreamListenerMixin)
from zope.interface import implementer

from bwscanner.logger import log
Expand Down Expand Up @@ -101,6 +102,7 @@ class StreamClosedListener(StreamListenerMixin):
immediately after a stream completes rather than wait for the
circuit to time out.
"""

def __init__(self, circ):
self.circ = circ

Expand All @@ -114,7 +116,8 @@ def options_need_new_consensus(tor_config, new_options):
the Tor config with the new options.
"""
if "UseMicroDescriptors" in new_options:
if tor_config.UseMicroDescriptors != new_options["UseMicroDescriptors"]:
if tor_config.UseMicroDescriptors != \
new_options["UseMicroDescriptors"]:
log.debug("Changing UseMicroDescriptors from {current} to {new}.",
current=tor_config.UseMicroDescriptors,
new=new_options["UseMicroDescriptors"])
Expand All @@ -128,7 +131,8 @@ def wait_for_newconsensus(tor_state):
def got_newconsensus(event):
log.debug("Got NEWCONSENSUS event: {event}", event=event)
got_consensus.callback(event)
tor_state.protocol.remove_event_listener('NEWCONSENSUS', got_newconsensus)
tor_state.protocol.remove_event_listener(
'NEWCONSENSUS', got_newconsensus)

tor_state.protocol.add_event_listener('NEWCONSENSUS', got_newconsensus)
return got_consensus
Expand All @@ -146,7 +150,8 @@ def connect_to_tor(launch_tor, circuit_build_timeout, control_port=None,
tor_options = {
'LearnCircuitBuildTimeout': 0, # Disable adaptive circuit timeouts.
'CircuitBuildTimeout': circuit_build_timeout,
'UseEntryGuards': 0, # Disable UseEntryGuards to avoid PathBias warnings.
# Disable UseEntryGuards to avoid PathBias warnings.
'UseEntryGuards': 0,
'UseMicroDescriptors': 0,
'FetchUselessDescriptors': 1,
'FetchDirInfoEarly': 1,
Expand All @@ -163,7 +168,8 @@ def connect_to_tor(launch_tor, circuit_build_timeout, control_port=None,
else:
log.info("Trying to connect to a running Tor instance.")
if control_port:
endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", control_port)
endpoint = endpoints.TCP4ClientEndpoint(
reactor, "localhost", control_port)
else:
endpoint = None
tor = yield txtorcon.connect(reactor, endpoint)
Expand Down
15 changes: 10 additions & 5 deletions bwscanner/circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class ExitScan(CircuitGenerator):
of the consensus in the exit position once. The first and
second hops are selected randomly.
"""

def __init__(self, state):
super(ExitScan, self).__init__(state)
random.shuffle(self.exits)
Expand All @@ -72,6 +73,7 @@ class TwoHop(CircuitGenerator):
Select two hop circuits with the relay to be measured and a random exit
relay of similar bandwidth.
"""

def __init__(self, state, partitions=1, this_partition=1, slice_width=50):
"""
TwoHop can be called multiple times with different partition
Expand All @@ -88,10 +90,13 @@ def circuit_generator():
choose an exit relay of similar bandwidth for the circuit
"""
num_relays = len(self.relays)
relay_subset = range(this_partition-1, num_relays, partitions)
log.info("Performing a measurement scan with {count} relays.", count=len(relay_subset))
relay_subset = range(this_partition - 1, num_relays, partitions)
log.info(
"Performing a measurement scan with {count} relays.",
count=len(relay_subset))

# Choose relays in a random order fromm the relays in this partition set.
# Choose relays in a random order fromm the relays in this
# partition set.
for i in random.sample(relay_subset, len(relay_subset)):
relay = self.relays[i]
yield relay, self.exit_by_bw(relay)
Expand All @@ -116,12 +121,12 @@ def exit_by_bw(self, relay):
if (exit.bandwidth < relay.bandwidth) and (i != num_exits):
continue

exit_slice = self.exits[i:i+self._slice_width]
exit_slice = self.exits[i:i + self._slice_width]
exits_needed = self._slice_width - len(exit_slice)

# There isn't enough exits, pick some slower exits for this slice.
if exits_needed:
slice_start = max(0, i-exits_needed)
slice_start = max(0, i - exits_needed)
exit_slice = self.exits[slice_start:i] + exit_slice

if relay in exit_slice:
Expand Down
31 changes: 20 additions & 11 deletions bwscanner/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

from twisted.internet import interfaces, reactor, defer, protocol
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.web.client import (SchemeNotSupported, Agent, BrowserLikePolicyForHTTPS,
ResponseDone, PotentialDataLoss, PartialDownloadError)
from twisted.web.client import (SchemeNotSupported, Agent,
BrowserLikePolicyForHTTPS, ResponseDone,
PotentialDataLoss, PartialDownloadError)
from txsocksx.client import SOCKS5ClientFactory
from txsocksx.tls import TLSWrapClientEndpoint
from zope.interface import implementer
Expand All @@ -16,9 +17,10 @@ def get_tor_socks_endpoint(tor_state):
proxy_endpoint = tor_state.protocol.get_conf("SocksPort")

def extract_port_value(result):
# Get the first SOCKS port number if any. SocksPort can be a single string or a list.
# Tor now also has support for unix domain SOCKS sockets so we need to be careful to just
# pick a SOCKS port.
# Get the first SOCKS port number if any. SocksPort can be a
# single string or a list.
# Tor now also has support for unix domain SOCKS sockets so
# we need to be careful to just pick a SOCKS port.
if isinstance(result['SocksPort'], list):
port = next(port for port in result['SocksPort'] if port.isdigit())
else:
Expand Down Expand Up @@ -61,12 +63,15 @@ def connect(self, protocol_factory):
Implements L{IStreamClientEndpoint.connect} to connect via TCP, after
SOCKS5 negotiation and Tor circuit construction is done.
"""
proxy_factory = SOCKS5ClientFactory(self.host, self.port, protocol_factory)
self.tor_socks_endpoint.addCallback(lambda end: end.connect(proxy_factory))
proxy_factory = SOCKS5ClientFactory(
self.host, self.port, protocol_factory)
self.tor_socks_endpoint.addCallback(
lambda end: end.connect(proxy_factory))

def _create_circ(proto):
hp = proto.transport.getHost()
d = self.state._attacher.create_circuit(hp.host, hp.port, self.path)
d = self.state._attacher.create_circuit(
hp.host, hp.port, self.path)
d.addErrback(proxy_factory.deferred.errback)
return proxy_factory.deferred

Expand Down Expand Up @@ -96,9 +101,11 @@ def _getEndpoint(self, parsedURI, host=None, port=None):
if hasattr(self, '_wrapContextFactory'):
tls_policy = self._wrapContextFactory(host, port)
elif hasattr(self, '_policyForHTTPS'):
tls_policy = self._policyForHTTPS().creatorForNetloc(host, port)
tls_policy = self._policyForHTTPS().\
creatorForNetloc(host, port)
else:
raise NotImplementedError("Cannot create a TLS validation policy.")
raise NotImplementedError(
"Cannot create a TLS validation policy.")
endpoint = self._tlsWrapper(tls_policy, endpoint)
return endpoint

Expand Down Expand Up @@ -142,7 +149,9 @@ def connectionLost(self, reason):
else:
self.deferred.errback(reason)
else:
log.debug("Deferred already called before connectionLost on hashingReadBodyProtocol.")
log.debug(
"Deferred already called before connectionLost on "
"hashingReadBodyProtocol.")


def hashingReadBody(response):
Expand Down
10 changes: 5 additions & 5 deletions bwscanner/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,18 @@ def circ_avg_bw(self, circuit):
for r, w, d in self.bw_samples(circuit):
# r and w are in units of bytes/second
# d is units of second
r_avg += (r**2)/d
w_avg += (w**2)/d
r_avg += (r**2) / d
w_avg += (w**2) / d
n_samples += 1
bytes_r_total += r
bytes_w_total += w
duration += d

if n_samples > 1:
wf = n_samples*bytes_r_total
wf = n_samples * bytes_r_total
return {'path': circuit.path,
'r_bw': int(r_avg/wf),
'w_bw': int(w_avg/wf),
'r_bw': int(r_avg / wf),
'w_bw': int(w_avg / wf),
'duration': duration,
'samples': n_samples,
'bytes_r': bytes_r_total,
Expand Down
6 changes: 4 additions & 2 deletions bwscanner/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ def setup_logging(log_level, log_name, log_directory=""):
Configure the logger to use the specified log file and log level
"""
log_filter = LogLevelFilterPredicate()
log_filter.setLogLevelForNamespace("bwscanner", LogLevel.levelWithName(log_level.lower()))
log_filter.setLogLevelForNamespace(
"bwscanner", LogLevel.levelWithName(log_level.lower()))

# Set up logging
log_file = DailyLogFile(log_name, log_directory)
file_observer = FileLogObserver(log_file, log_event_format)
console_observer = FileLogObserver(sys.stdout, log_event_format)

file_filter_observer = FilteringLogObserver(file_observer, (log_filter,))
console_filter_observer = FilteringLogObserver(console_observer, (log_filter,))
console_filter_observer = FilteringLogObserver(
console_observer, (log_filter,))

globalLogPublisher.addObserver(file_filter_observer)
globalLogPublisher.addObserver(console_filter_observer)
25 changes: 14 additions & 11 deletions bwscanner/measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ def __init__(self, state, clock, measurement_dir, **kwargs):
self.circuits = None
self.baseurl = 'https://bwauth.torproject.org/bwauth.torproject.org'
self.bw_files = {
64*1024: ("64M", "913b3c5df256d62235f955fa936e7a4e2d5e0cb6"),
32*1024: ("32M", "a536076ef51c2cfff607fec2d362671e031d6b48"),
16*1024: ("16M", "e91690ed2abf05e347b61aafaa23abf2a2b3292f"),
8*1024: ("8M", "c690229b300945ec4ba872b80e8c443e2e1750f0"),
4*1024: ("4M", "94f7bc6679a4419b080debd70166c2e43e80533d"),
2*1024: ("2M", "9793cc92932598898d22497acdd5d732037b1a13"),
64 * 1024: ("64M", "913b3c5df256d62235f955fa936e7a4e2d5e0cb6"),
32 * 1024: ("32M", "a536076ef51c2cfff607fec2d362671e031d6b48"),
16 * 1024: ("16M", "e91690ed2abf05e347b61aafaa23abf2a2b3292f"),
8 * 1024: ("8M", "c690229b300945ec4ba872b80e8c443e2e1750f0"),
4 * 1024: ("4M", "94f7bc6679a4419b080debd70166c2e43e80533d"),
2 * 1024: ("2M", "9793cc92932598898d22497acdd5d732037b1a13"),
}

self.result_sink = ResultSink(self.measurement_dir, chunk_size=10)
Expand All @@ -69,9 +69,9 @@ def choose_file_size(self, path):

XXX: Should we just use the bandwidth of the measured relay instead?
"""
avg_bw = sum([r.bandwidth for r in path])/len(path)
avg_bw = sum([r.bandwidth for r in path]) / len(path)
for size in sorted(self.bw_files.keys()):
if avg_bw*5 < size:
if avg_bw * 5 < size:
return size
return max(self.bw_files.keys())

Expand Down Expand Up @@ -136,7 +136,8 @@ def get_circuit_bw(result):
report['path_desc_bws'].append((yield self.get_r_desc_bw(relay)))
report['path_ns_bws'].append((yield self.get_r_ns_bw(relay)))
report['path_bws'] = [r.bandwidth for r in path]
log.info("Download successful for router {fingerprint}.", fingerprint=path[0].id_hex)
log.info(
"Download successful for router {fingerprint}.", fingerprint=path[0].id_hex)
defer.returnValue(report)

def circ_failure(failure):
Expand All @@ -154,7 +155,8 @@ def timeoutDeferred(deferred, timeout):
def cancelDeferred(deferred):
deferred.cancel()

delayedCall = self.clock.callLater(timeout, cancelDeferred, deferred)
delayedCall = self.clock.callLater(
timeout, cancelDeferred, deferred)

def gotResult(result):
if delayedCall.active():
Expand All @@ -181,7 +183,8 @@ def get_r_ns_bw(self, router):

raw_descriptor = yield self.state.protocol.get_info_raw('ns/id/{}'.format(router.id_hex))
router_ns_entry = RouterStatusEntryV3(raw_descriptor)
defer.returnValue((router_ns_entry.bandwidth, router_ns_entry.is_unmeasured))
defer.returnValue((router_ns_entry.bandwidth,
router_ns_entry.is_unmeasured))

@defer.inlineCallbacks
def get_r_desc_bw(self, router):
Expand Down
24 changes: 16 additions & 8 deletions bwscanner/partition_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ def __init__(self, state, clock, log_dir, stopped, relays, shared_secret, partit
consensus += relay + ","
consensus_hash = hashlib.sha256(consensus).digest()
shared_secret_hash = hashlib.sha256(shared_secret).digest()
prng_seed = hashlib.pbkdf2_hmac('sha256', consensus_hash, shared_secret_hash, iterations=1)
self.circuits = lazy2HopCircuitGenerator(relays, this_partition, partitions, prng_seed)
prng_seed = hashlib.pbkdf2_hmac(
'sha256', consensus_hash, shared_secret_hash, iterations=1)
self.circuits = lazy2HopCircuitGenerator(
relays, this_partition, partitions, prng_seed)

# XXX adjust me
self.result_sink = ResultSink(log_dir, chunk_size=1000)
Expand Down Expand Up @@ -119,23 +121,28 @@ def circuit_build_failure(f):
return None

time_start = self.now()
d = build_timeout_circuit(self.state, self.clock, route, self.circuit_life_duration)
d = build_timeout_circuit(
self.state, self.clock, route, self.circuit_life_duration)
d.addCallback(circuit_build_success)
d.addErrback(circuit_build_timeout)
d.addErrback(circuit_build_failure)
self.tasks.append(d)

def start_prometheus_exportor(self):
self.count_success = Counter('circuit_build_success_counter', 'successful circuit builds')
self.count_failure = Counter('circuit_build_failure_counter', 'failed circuit builds')
self.count_timeout = Counter('circuit_build_timeout_counter', 'timed out circuit builds')
self.count_success = Counter(
'circuit_build_success_counter', 'successful circuit builds')
self.count_failure = Counter(
'circuit_build_failure_counter', 'failed circuit builds')
self.count_timeout = Counter(
'circuit_build_timeout_counter', 'timed out circuit builds')

if not has_prometheus_client:
return
root = Resource()
root.putChild(b'metrics', MetricsResource())
factory = Site(root)
reactor.listenTCP(self.prometheus_port, factory, interface=self.prometheus_interface)
reactor.listenTCP(self.prometheus_port, factory,
interface=self.prometheus_interface)

def start(self):
self.start_prometheus_exportor()
Expand All @@ -147,7 +154,8 @@ def pop():
except StopIteration:
self.stop()
else:
self.call_id = self.clock.callLater(self.circuit_build_duration, pop)
self.call_id = self.clock.callLater(
self.circuit_build_duration, pop)
self.clock.callLater(0, pop)

def stop(self):
Expand Down
Loading