Skip to content

Commit

Permalink
Adding log-kubectl file with changing the run logs check by using get…
Browse files Browse the repository at this point in the history
…_workflow function
  • Loading branch information
AndriiPovsten committed Jul 30, 2024
1 parent b37451d commit 23df409
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 74 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,4 @@ helm/reana/Chart.lock
# Helm releases
.cr-release-packages
.cr-index
helm/configurations/values-dev.yaml
26 changes: 0 additions & 26 deletions helm/configurations/values-dev.yaml

This file was deleted.

96 changes: 96 additions & 0 deletions scripts/log-kubectl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import subprocess
import click
import signal
import sys
import os
import time

# Construct the absolute path to the reana-client directory
current_dir = os.path.dirname(os.path.abspath(__file__))
reana_client_path = os.path.abspath(os.path.join(current_dir, '..', '..', '..', 'reana-client'))

# Add the reana-client directory to the sys.path
if reana_client_path not in sys.path:
sys.path.insert(0, reana_client_path)

from reana_client.api.client import get_workflow_logs

# Global variable to control the loop
continue_logging = True

def signal_handler(sig, frame):
global continue_logging
continue_logging = False
print("Stopping log collection...")

def run_command_with_retries(command, max_retries=5, delay=2):
attempt = 0
while attempt < max_retries:
try:
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
return process
except Exception as e:
print(f"An error occurred: {e}. Attempt {attempt + 1} of {max_retries}")
attempt += 1
time.sleep(delay)
print("All attempts to run the command have failed.")
return None

@click.command()
@click.option('--log-file', required=True, help='The name of the file to save the logs.')
@click.option('--workflow', required=True, help='The name of the workflow')
def run_command_and_save_logs(log_file, workflow):
global continue_logging

# Get the access token from environment variable
access_token = os.getenv('REANA_ACCESS_TOKEN')
if not access_token:
print("Error: REANA_ACCESS_TOKEN environment variable is not set.")
sys.exit(1)

# Call get_workflow_logs with the workflow and access_token
service = get_workflow_logs(workflow, access_token)
# Extract batch_id from the service response
batch_id = service.get('workflow_id')
if not batch_id:
print("Error: 'workflow_id' not found in the service response.")
sys.exit(1)
print("The batch_id is suppose to be this:", batch_id)

# Register the signal handler
signal.signal(signal.SIGINT, signal_handler)

command = (
"kubectl get pod -o='custom-columns=NAME:.metadata.name,PHASE:.status.phase,"
"CREATED:.metadata.creationTimestamp,STARTED:.status.startTime,"
"STARTED_CONTAINERS:.status.containerStatuses[*].state.*.startedAt,"
"FINISHED_CONTAINERS:.status.containerStatuses[*].state.*.finishedAt' -w"
)

with open(log_file, "a") as file: # 'a' is for appending to the file without truncating it
# Popen - command is executed in a new shell process, standard output and standard error are redirected to pipes
while continue_logging:
process = run_command_with_retries(command)
if not process:
print("Failed to start the command")
break
try:
while continue_logging:
line = process.stdout.readline()
if not line:
break
line = line.strip() # Strip the line of leading/trailing whitespace
print(line)
file.write(line + "\n")
if f"reana-run-batch-{batch_id}" in line and "Failed" in line:
file.write(line + '\n') # Ensure newline is added when writing to file
continue_logging = False
break
except Exception as e:
print(f"An error occurred while reading the process output: {e}")
finally:
process.terminate()
process.wait()

if __name__ == "__main__":
run_command_and_save_logs()
129 changes: 81 additions & 48 deletions scripts/logs_lifetime_benchmarking_test.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,78 @@
import pandas as pd
import subprocess
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.dates import DateFormatter, SecondLocator
import click
import os
import re
import sys
import logging
from reana_client.api.client import get_workflow_logs

# Ensure REANA_ACCESS_TOKEN is set
access_token = os.getenv('REANA_ACCESS_TOKEN')
if not access_token:
print("Error: REANA_ACCESS_TOKEN environment variable is not set.")
sys.exit(1)

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Function to extract job IDs from lines
def extract_job_ids_from_lines(lines):
job_ids = set()
for line in lines:
match = re.search(r'reana-run-job-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}', line)
if match:
job_ids.add(match.group(0))
return job_ids

"""Run this script to generate the plots of current job status"""
"""First compare the logs from reana-client logs command and the job pod ID's from statistics file"""

def run_reana_client_logs(command):
command = f"reana-client logs -w {workflow}"
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode == 0:
return result.stdout
else:
print("Comamnd failed to run, error:")
print(result.stderr)
return None

# Function to parse log file
def parse_log_file(file_path):
with open(file_path, 'r') as f:
lines = f.readlines()
return lines

# Function to write filtered lines to a new file
def write_filtered_log_file(filtered_lines, filtered_file_path):
with open(filtered_file_path, 'w') as f:
f.writelines(filtered_lines)

# Function to extract unique jobs from lines
def extract_unique_jobs(lines):
unique_jobs = {}
for line in lines:
if line.strip().startswith('reana-run-job-'):
job_id = line.strip().split()[0]
match = re.match(r'(reana-run-job-\w{8}-\w{4}-\w{4}-\w{4}-\w{12})', line)
if match:
job_id = match.group(0)
unique_jobs[job_id] = line.strip()
return unique_jobs.values()

# Function to extract succeeded timestamps from unique jobs
def extract_succeeded_timestamps(unique_jobs):
succeeded_timestamps = [line.split()[5] for line in unique_jobs if line.split()[5] != "<none>"]
succeeded_timestamps = [ts.split(',')[0] for ts in succeeded_timestamps]
return pd.to_datetime(succeeded_timestamps, errors='coerce')

# Function to get sorted data by timestamp
def get_sorted_data(lines):
sorted_data = sorted(lines, key=lambda x: x.split()[1])
return sorted_data

sorted_lines = []
for line in lines:
parts = line.split()
try:
timestamp = pd.to_datetime(parts[2])
sorted_lines.append((timestamp, line))
except Exception as e:
logger.error(f"Error parsing date from line: {line}. Error: {e}")
sorted_lines.sort(key=lambda x: x[0])
return [line for _, line in sorted_lines]

# Function to filter jobs based on status
def filter_jobs(sorted_data, status):
return [line for line in sorted_data if line.split()[1] == status]

# Function to extract running timestamps
def extract_running_timestamps(running_jobs):
timestamps_running = []
encountered_jobs_running = set()
Expand All @@ -64,6 +95,7 @@ def extract_running_timestamps(running_jobs):
timestamps_running.sort()
return timestamps_running

# Function to extract pending timestamps
def extract_pending_timestamps(pending_jobs):
timestamps_pending = []
encountered_jobs_pending = set()
Expand All @@ -90,6 +122,7 @@ def extract_pending_timestamps(pending_jobs):
timestamps_pending.sort()
return timestamps_pending

# Function to calculate cumulative timestamps
def calculate_cumulative(timestamps):
x = []
y = []
Expand All @@ -99,19 +132,13 @@ def calculate_cumulative(timestamps):
x.append(timestamp[0])
y.append(cumulative_sum)
return x, y

def plot_data(succeeded_counts, x_running, y_running, x_pending, y_pending):
plt.figure(figsize=figsize)

# Plot succeeded jobs
# Function to plot data
def plot_data(succeeded_counts, x_running, y_running, x_pending, y_pending, title, figsize):
plt.figure(figsize=figsize)
plt.plot(succeeded_counts.index, succeeded_counts.cumsum(), label='Finished', linestyle='-', color='green', alpha=0.5)

# Plot running jobs
plt.plot(x_running, y_running, linestyle='-', color='blue', alpha=0.5, linewidth=3, label='Running')

# Plot pending jobs
plt.plot(x_pending, y_pending, linestyle='-', color='orange', alpha=0.5, linewidth=3, label='Pending')

plt.xlabel('Processing time')
plt.ylabel('Number of Jobs')
plt.title(title)
Expand All @@ -127,39 +154,45 @@ def plot_data(succeeded_counts, x_running, y_running, x_pending, y_pending):
@click.argument('file_path')
@click.option('--title', default='Analysis Results', help='Title of the analysis results')
@click.option('--figsize', nargs=2, type=float, default=(12, 8), help='Figure size as two float values')
@click.option('--workflow', required=False, help='Name of the REANA workflow the same as the processed .txt file')
@click.option('--workflow', required=True, help='Name of the REANA workflow')
def main(file_path, title, figsize, workflow):
"""Compare the reana-client logs and the jobs from the analysis results
Run benchmarking tests. Generate matplotlib plot
""" This script allows to plot the workflow lifetime statistics.
As a results of evaluating the logs file with pod life cycle information,
the statistics of how many jobe were running in parallel can be found.
The script requires matplotlib and pandas packages
Steps to run benchmarking workflow lifetime test:
Steps to run benchmarking workflow lifetime test:
.. code-block:: console
\b
#To run this script
$ kubectl #To save a live logs
$ cd reana/scripts
$ python lifetime.py logs.txt # insert your .txt file with logs
$ python lifetime.py logs.txt # insert your .txt file with logs and the name of the workflow
"""
reana_logs = run_reana_client_logs(workflow)
reana_job_ids = set()
for line in reana_logs.splitlines():
if line.strip().startswith('reana-run-job'):
job_id = line.strip().split()[0]
reana_job_ids.add(job_id)
service = get_workflow_logs(workflow, access_token)
log_string = service['logs']
reana_run_job_ids = re.findall(r'reana-run-job-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}', log_string)

lines = parse_log_file(file_path)
file_job_ids = extract_job_ids_from_lines(lines)

diff_job_ids = set(reana_run_job_ids).symmetric_difference(file_job_ids)
if diff_job_ids:
print("Differing Job IDs:")
for job_id in diff_job_ids:
print(job_id)
else:
print("No differing Job IDs found.")

unique_jobs = extract_unique_jobs(lines)
filtered_lines = [line for line in lines if not any(job_id in line for job_id in diff_job_ids)]
filtered_file_path = 'filtered_' + file_path
write_filtered_log_file(filtered_lines, filtered_file_path)

unique_jobs = extract_unique_jobs(filtered_lines)
succeeded_timestamps = extract_succeeded_timestamps(unique_jobs)
first_succeeded_timestamp = succeeded_timestamps.dropna().min()
succeeded_counts = succeeded_timestamps.value_counts().sort_index()

sorted_data = get_sorted_data(lines)

sorted_data = get_sorted_data(filtered_lines)
running_jobs = filter_jobs(sorted_data, 'Running')
timestamps_running = extract_running_timestamps(running_jobs)
x_running, y_running = calculate_cumulative(timestamps_running)
Expand All @@ -168,7 +201,7 @@ def main(file_path, title, figsize, workflow):
timestamps_pending = extract_pending_timestamps(pending_jobs)
x_pending, y_pending = calculate_cumulative(timestamps_pending)

plot_data(succeeded_counts, x_running, y_running, x_pending, y_pending)
plot_data(succeeded_counts, x_running, y_running, x_pending, y_pending, title, figsize)

if __name__ == "__main__":
main()

0 comments on commit 23df409

Please sign in to comment.