Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pods status benchmarking #809

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions scripts/log-kubectl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import subprocess
import click
import signal
import sys
import os
import time

# Construct the absolute path to the reana-client directory
current_dir = os.path.dirname(os.path.abspath(__file__))
reana_client_path = os.path.abspath(os.path.join(current_dir, '..', '..', 'reana-client'))

# Add the reana-client directory to the sys.path
if reana_client_path not in sys.path:
sys.path.insert(0, reana_client_path)

from reana_client.api.client import get_workflow_logs

# Global variable to control the loop
continue_logging = True

def signal_handler(sig, frame):
global continue_logging
continue_logging = False
print("Stopping log collection...")

def run_command_with_retries(command, max_retries=5, delay=2):
attempt = 0
while attempt < max_retries:
try:
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
return process
except Exception as e:
print(f"An error occurred: {e}. Attempt {attempt + 1} of {max_retries}")
attempt += 1
time.sleep(delay)
print("All attempts to run the command have failed.")
return None

@click.command()
@click.option('--log-file', required=True, help='The name of the file to save the logs.')
@click.option('--workflow', required=True, help='The name of the workflow')
def run_command_and_save_logs(log_file, workflow):
"""This script allows to check the workflow pods status and save the logs file for further graphical evaluation
To run this script:

Steps to run benchmarking workflow lifetime test:
.. code-block:: console
\b
#To run this script
$ cd reana/scripts
$ python log-kubectl.py --log-file logs-file --workflow your-workflow

"""
global continue_logging

# Get the access token from environment variable
access_token = os.getenv('REANA_ACCESS_TOKEN')
if not access_token:
print("Error: REANA_ACCESS_TOKEN environment variable is not set.")
sys.exit(1)

# Call get_workflow_logs with the workflow and access_token
service = get_workflow_logs(workflow, access_token)
# Extract batch_id from the service response
batch_id = service.get('workflow_id')
if not batch_id:
print("Error: 'workflow_id' not found in the service response.")
sys.exit(1)
print("The batch_id is suppose to be this:", batch_id)

# Register the signal handler
signal.signal(signal.SIGINT, signal_handler)

command = (
"kubectl get pod -o='custom-columns=NAME:.metadata.name,PHASE:.status.phase,"
"CREATED:.metadata.creationTimestamp,STARTED:.status.startTime,"
"STARTED_CONTAINERS:.status.containerStatuses[*].state.*.startedAt,"
"FINISHED_CONTAINERS:.status.containerStatuses[*].state.*.finishedAt' -w"
)

with open(log_file, "a") as file: # 'a' is for appending to the file without truncating it
while continue_logging:
process = run_command_with_retries(command)
if not process:
print("Failed to start the command")
break
try:
while continue_logging:
line = process.stdout.readline()
if not line:
break
line = line.strip() # Strip the line of leading/trailing whitespace
print(line)
file.write(line + "\n")
if f"reana-run-batch-{batch_id}" in line and "Failed" in line:
file.write(line + '\n') # Ensure newline is added when writing to file
continue_logging = False
break
except Exception as e:
print(f"An error occurred while reading the process output: {e}")
finally:
process.terminate()
process.wait()

if __name__ == "__main__":
run_command_and_save_logs()
207 changes: 207 additions & 0 deletions scripts/logs_lifetime_benchmarking_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import subprocess
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.dates import DateFormatter, SecondLocator
import click
import os
import re
import sys
import logging
from reana_client.api.client import get_workflow_logs

# Ensure REANA_ACCESS_TOKEN is set
access_token = os.getenv('REANA_ACCESS_TOKEN')
if not access_token:
print("Error: REANA_ACCESS_TOKEN environment variable is not set.")
sys.exit(1)

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Function to extract job IDs from lines
def extract_job_ids_from_lines(lines):
job_ids = set()
for line in lines:
match = re.search(r'reana-run-job-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}', line)
if match:
job_ids.add(match.group(0))
return job_ids

# Function to parse log file
def parse_log_file(file_path):
with open(file_path, 'r') as f:
lines = f.readlines()
return lines

# Function to write filtered lines to a new file
def write_filtered_log_file(filtered_lines, filtered_file_path):
with open(filtered_file_path, 'w') as f:
f.writelines(filtered_lines)

# Function to extract unique jobs from lines
def extract_unique_jobs(lines):
unique_jobs = {}
for line in lines:
match = re.match(r'(reana-run-job-\w{8}-\w{4}-\w{4}-\w{4}-\w{12})', line)
if match:
job_id = match.group(0)
unique_jobs[job_id] = line.strip()
return unique_jobs.values()

# Function to extract succeeded timestamps from unique jobs
def extract_succeeded_timestamps(unique_jobs):
succeeded_timestamps = [line.split()[5] for line in unique_jobs if line.split()[5] != "<none>"]
succeeded_timestamps = [ts.split(',')[0] for ts in succeeded_timestamps]
return pd.to_datetime(succeeded_timestamps, errors='coerce')

# Function to get sorted data by timestamp
def get_sorted_data(lines):
sorted_lines = []
for line in lines:
parts = line.split()
try:
timestamp = pd.to_datetime(parts[2])
sorted_lines.append((timestamp, line))
except Exception as e:
logger.error(f"Error parsing date from line: {line}. Error: {e}")
sorted_lines.sort(key=lambda x: x[0])
return [line for _, line in sorted_lines]

# Function to filter jobs based on status
def filter_jobs(sorted_data, status):
return [line for line in sorted_data if line.split()[1] == status]

# Function to extract running timestamps
def extract_running_timestamps(running_jobs):
timestamps_running = []
encountered_jobs_running = set()

for line in running_jobs:
parts = line.split()
job_id = parts[0]
if job_id in encountered_jobs_running:
continue

start_time = pd.to_datetime(parts[3])
finish_time_str = parts[5].split(',')[0]

if finish_time_str != '<none>':
finish_time = pd.to_datetime(finish_time_str)
timestamps_running.append((start_time, 1))
timestamps_running.append((finish_time, -1))
encountered_jobs_running.add(job_id)

timestamps_running.sort()
return timestamps_running

# Function to extract pending timestamps
def extract_pending_timestamps(pending_jobs):
timestamps_pending = []
encountered_jobs_pending = set()

for line in pending_jobs:
parts = line.split()
job_id = parts[0]
if job_id in encountered_jobs_pending:
continue

start_time_str = parts[2]
if start_time_str == '<none>':
continue

start_time = pd.to_datetime(start_time_str)
finish_time_str = parts[3].split(',')[0]

if finish_time_str != '<none>':
finish_time = pd.to_datetime(finish_time_str)
timestamps_pending.append((start_time, 1))
timestamps_pending.append((finish_time, -1))
encountered_jobs_pending.add(job_id)

timestamps_pending.sort()
return timestamps_pending

# Function to calculate cumulative timestamps
def calculate_cumulative(timestamps):
x = []
y = []
cumulative_sum = 0
for timestamp in timestamps:
cumulative_sum += timestamp[1]
x.append(timestamp[0])
y.append(cumulative_sum)
return x, y

# Function to plot data
def plot_data(succeeded_counts, x_running, y_running, x_pending, y_pending, title, figsize):
plt.figure(figsize=figsize)
plt.plot(succeeded_counts.index, succeeded_counts.cumsum(), label='Finished', linestyle='-', color='green', alpha=0.5)
plt.plot(x_running, y_running, linestyle='-', color='blue', alpha=0.5, linewidth=3, label='Running')
plt.plot(x_pending, y_pending, linestyle='-', color='orange', alpha=0.5, linewidth=3, label='Pending')
plt.xlabel('Processing time')
plt.ylabel('Number of Jobs')
plt.title(title)
plt.gca().xaxis.set_major_formatter(DateFormatter("%H:%M:%S"))
plt.gca().xaxis.set_major_locator(SecondLocator(interval=40))
plt.grid(True)
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

@click.command()
@click.argument('file_path')
@click.option('--title', default='Analysis Results', help='Title of the analysis results')
@click.option('--figsize', nargs=2, type=float, default=(12, 8), help='Figure size as two float values')
@click.option('--workflow', required=True, help='Name of the REANA workflow')
def main(file_path, title, figsize, workflow):
""" This script allows to plot the workflow lifetime statistics.
As a results of evaluating the logs file with pod life cycle information,
the statistics of how many jobe were running in parallel can be found.


Steps to run benchmarking workflow lifetime test:
.. code-block:: console
\b
#To run this script
$ kubectl #To save a live logs
$ cd reana/scripts
$ python lifetime.py logs.txt # insert your .txt file with logs and the name of the workflow
"""
service = get_workflow_logs(workflow, access_token)
log_string = service['logs']
reana_run_job_ids = re.findall(r'reana-run-job-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}', log_string)

lines = parse_log_file(file_path)
file_job_ids = extract_job_ids_from_lines(lines)

diff_job_ids = set(reana_run_job_ids).symmetric_difference(file_job_ids)
if diff_job_ids:
print("Differing Job IDs:")
for job_id in diff_job_ids:
print(job_id)
else:
print("No differing Job IDs found.")

filtered_lines = [line for line in lines if not any(job_id in line for job_id in diff_job_ids)]
filtered_file_path = 'filtered_' + file_path
write_filtered_log_file(filtered_lines, filtered_file_path)

unique_jobs = extract_unique_jobs(filtered_lines)
succeeded_timestamps = extract_succeeded_timestamps(unique_jobs)
succeeded_counts = succeeded_timestamps.value_counts().sort_index()

sorted_data = get_sorted_data(filtered_lines)
running_jobs = filter_jobs(sorted_data, 'Running')
timestamps_running = extract_running_timestamps(running_jobs)
x_running, y_running = calculate_cumulative(timestamps_running)

pending_jobs = filter_jobs(sorted_data, 'Pending')
timestamps_pending = extract_pending_timestamps(pending_jobs)
x_pending, y_pending = calculate_cumulative(timestamps_pending)

plot_data(succeeded_counts, x_running, y_running, x_pending, y_pending, title, figsize)

if __name__ == "__main__":
main()