Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use pyhafas #36

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 94 additions & 141 deletions schiene/schiene.py
Original file line number Diff line number Diff line change
@@ -1,150 +1,103 @@
#!/usr/bin/env python3
import requests
import json
from datetime import datetime
from bs4 import BeautifulSoup


def parse_connections(html):
soup = BeautifulSoup(html, "html.parser")
connections = list()

for row in soup.find_all("td", class_="overview timelink"):
columns = row.parent.find_all("td")

try:
price_raw = columns[3].find("span", class_="bold").text.strip().replace(',', '.')
price = float(price_raw)
except:
price = None

data = {
'details': columns[0].a.get('href').replace('!details=opened!', '!details=opened!detailsVerbund=opened!'),
'departure': columns[0].a.contents[0].string,
'arrival': columns[0].a.contents[2].string,
'transfers': int(columns[2].contents[0]),
'time': columns[2].contents[2],
'products': columns[3].contents[0].split(', '),
'price': price
}

if not columns[1].find('img') and not columns[1].find('span', class_='delay') and not columns[1].find('span', class_='delayOnTime'):
data['ontime'] = True
data['canceled'] = False
else:
data = parse_delay(data)

connections.append(data)
return connections

def parse_stations(html):
"""
Strips JS code, loads JSON
"""
html = html.replace('SLs.sls=', '').replace(';SLs.showSuggestion();', '')
html = json.loads(html)
return html['suggestions']

def parse_delay(data):
"""
Prase the delay
"""
# parse data from the details view
rsp = requests.get(data['details'])
soup = BeautifulSoup(rsp.text, "html.parser")

# get departure delay
delay_departure_raw = None
try:
delay_departure_raw= soup.find('div', class_="routeStart").find('span', class_=["delay", "delayOnTime"])
except:
pass

if delay_departure_raw:
delay_departure = calculate_delay(data['departure'],
delay_departure_raw.text)
else:
delay_departure = 0

# get arrival delay
delay_arrival_raw = None
try:
delay_arrival_raw = soup.find('div', class_=["routeEnd","routeEndAdditional"]).find('span', class_=["delay", "delayOnTime"])
except:
pass

if delay_arrival_raw:
delay_arrival = calculate_delay(data['arrival'],
delay_arrival_raw.text)
else:
delay_arrival = 0

# save the parsed data
if delay_departure + delay_arrival == 0:
data['ontime'] = True
else:
data['ontime'] = False
data['delay'] = {
'delay_departure': int(delay_departure),
'delay_arrival': int(delay_arrival)
}

# TODO: this should not be hardcoded!
data['canceled'] = False

return data

def calculate_delay(original, delay):
"""
Calculate the delay
"""
original = datetime.strptime(original, '%H:%M')
delayed = datetime.strptime(delay, '%H:%M')
# if the original time was before 0:00 and the delayed time is in the new day, we need to do a different calculation
if delayed < original:
fullday = 86400
diff = fullday - original + delayed
else:
diff = delayed - original
return diff.total_seconds() // 60


class Schiene():
def __format_timedelta(self, delta):
"""Formats a timedelta duration to [N days] %H:%M:%S format"""
seconds = int(delta.total_seconds())

secs_in_a_day = 86400
secs_in_a_hour = 3600
secs_in_a_min = 60

days, seconds = divmod(seconds, secs_in_a_day)
hours, seconds = divmod(seconds, secs_in_a_hour)
minutes, seconds = divmod(seconds, secs_in_a_min)

time_fmt = f"{hours:02d}:{minutes:02d}"

if days > 0:
suffix = "s" if days > 1 else ""
return f"{days} day{suffix} {time_fmt}"

return time_fmt

def __parse_journeys(self, journeys, client):
connections = list()
for i in range(3):
journey = client.journey(journeys[i].id)
products = ''
cancel_info = ''
cancelled = False

for leg in journey.legs:
product = str(leg.name)

if len(products) == 0:
products = product
else:
products = products + " / " + product

if leg.cancelled and len(cancel_info) == 0:
cancel_info = " (cancelled)"
cancelled = True

time = self.__format_timedelta(journey.duration)
data = {
'details': 'To be removed',
'departure': (journey.legs[0].departure).strftime("%H:%M") + cancel_info,
'arrival': (journey.legs[0].arrival).strftime("%H:%M") + cancel_info,
'transfers': len(journey.legs)-1,
'time': time,
'products': products,
'price': 'Not supported at the moment - issue shall be placed in pyhafas',
'canceled': cancelled
}

if journey.legs[0].departureDelay == None:
delay_departure = 0
else:
delay_departure = journey.legs[0].departureDelay

if journey.legs[len(journey.legs)-1].arrivalDelay == None:
delay_arrival = 0
else:
delay_arrival = journey.legs[len(journey.legs)-1].arrivalDelay

if delay_departure + delay_arrival == 0:
data['ontime'] = True
else:
data['ontime'] = False
data['delay'] = {
'delay_departure': int(delay_departure.seconds/60),
'delay_arrival': int(delay_arrival.seconds/60)
}

connections.append(data)
return connections

def stations(self, station, limit=10):
"""
Find stations for given queries
def connections(self, origin, destination, dt=datetime.now(), only_direct=False):
from pyhafas import HafasClient
from pyhafas.profile import DBProfile
from pyhafas.types.fptf import Leg

Args:
station (str): search query
limit (int): limit number of results
"""
query = {
'start': 1,
'S': station + '?',
'REQ0JourneyStopsB': limit
}
rsp = requests.get('http://reiseauskunft.bahn.de/bin/ajax-getstop.exe/dn', params=query)
return parse_stations(rsp.text)
client = HafasClient(DBProfile())

origin_location = client.locations(origin)[0]
destination_location = client.locations(destination)[0]

def connections(self, origin, destination, dt=datetime.now(), only_direct=False):
"""
Find connections between two stations

Args:
origin (str): origin station
destination (str): destination station
dt (datetime): date and time for query
only_direct (bool): only direct connections
"""
query = {
'S': origin,
'Z': destination,
'date': dt.strftime("%d.%m.%y"),
'time': dt.strftime("%H:%M"),
'start': 1,
'REQ0JourneyProduct_opt0': 1 if only_direct else 0
}
rsp = requests.get('http://mobile.bahn.de/bin/mobil/query.exe/dox?', params=query)
return parse_connections(rsp.text)
if only_direct:
changes = 0
else:
changes = 100

journeys = client.journeys(
origin=origin_location,
via=[],
destination=destination_location,
date=dt,
max_changes=changes,
)

return self.__parse_journeys(journeys, client)