diff --git a/Makefile b/Makefile deleted file mode 100644 index c8505c7a..00000000 --- a/Makefile +++ /dev/null @@ -1,40 +0,0 @@ -# Makefile - -objects = bin/cpu-benchmark.o -CXX= g++ -CPPFLAGS= -std=c++11 -execname = bin/cpu-benchmark - -# compile -$(execname): $(objects) - $(CXX) $(CPPFLAGS) -o $(execname) $(objects) - -#clean Makefile -clean: - rm -rf $(objects) $(execname) - - -# #define variables -# objects= gpu-benchmark.o kernels.o -# NVCC= nvcc #cuda c compiler -# CPPFLAGS= -std=c++11 -# opt= -O2 #optimization flag -# LIBS= -# execname= gpu-benchmark - -# .PHONY: clean - -# #compile -# $(execname): $(objects) -# $(NVCC) $(CPPFLAGS) $(opt) -o $(execname) $(objects) $(LIBS) - -# kernels.o: kernels.cu -# $(NVCC) $(CPPFLAGS) $(opt) -c kernels.cu -# gpu-benchmark.o: gpu-benchmark.cu -# $(NVCC) $(CPPFLAGS) $(opt) -c gpu-benchmark.cu - - -# #clean Makefile -# clean: -# rm $(objects) -# #end of Makefile diff --git a/bin/cpu-benchmark.cpp b/bin/cpu-benchmark.cpp deleted file mode 100644 index 359d6175..00000000 --- a/bin/cpu-benchmark.cpp +++ /dev/null @@ -1,89 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - -#define PRECISION 100000L - -/** - * This function computes pi using work trials for a Monte Carlo method. It's - * GOOD because it uses a good number generator. Unfortunately, this method - * leads to extra memory references that cause extra RAM references, and thus - * extra cache misses. - */ - -void compute_good_pi(long work) { - - std::uniform_real_distribution random_dist(-0.5, 0.5); - std::mt19937 rng; - rng.seed((long)&work); - - double good_pi = 0.0; - double x, y; - for (long sample = 0; sample < work; sample++) { - x = random_dist(rng); - y = random_dist(rng); - good_pi += (double)(std::sqrt(x * x + y * y) < 0.5); - - // Print progress every 1% of completion - if (sample % (work / 100) == 0) { - double progress = (double)sample / work * 100; - std::cout << "\rProgress: " << std::fixed << std::setprecision(2) << progress << "%" << std::flush; - } - } - std::cout << "\rProgress: 100.00%\n"; // Ensure full progress is displayed -} - -/** - * This function computes pi using work trials for a Monte Carlo method. It's - * TERRIBLE because it uses a custom, bad, number generator, which has too - * much bias to compute a good value a PI. The reason for using the generator - * is that it does not cause extra memory references, and thus keeps this benchmark - * 100% CPU intensive. - */ -void compute_terrible_pi(long work) { - - long rng = (long)&work; - double terrible_pi = 0.0; - double x_value, y_value; - for (long sample = 0; sample < work; sample++) { - rng = (((rng * 214013L + 2531011L) >> 16) & 32767); - x_value = -0.5 + (rng % PRECISION) / (double)PRECISION; - rng = (((rng * 214013L + 2531011L) >> 16) & 32767); - y_value = -0.5 + (rng % PRECISION) / (double)PRECISION; - terrible_pi += (double)(std::sqrt(x_value * x_value + y_value * y_value) < 0.5); - - // Print progress every 1% of completion - if (sample % (work / 100) == 0) { - double progress = (double)sample / work * 100; - std::cout << "\rProgress: " << std::fixed << std::setprecision(2) << progress << "%" << std::flush; - } - } - std::cout << "\rProgress: 100.00%\n"; // Ensure full progress is displayed -} - -int main(int argc, char **argv) { - - // Process command-line args - if (argc != 2) { - std::cerr << "Usage: " << argv[0] << " \n"; - exit(1); - } - - long work; - try { - work = std::stol(argv[1]); - } catch (std::invalid_argument &e) { - std::cerr << "Invalid argument: " << e.what() << "\n"; - exit(1); - } - - // Compute Pi using terrible method - compute_terrible_pi(1000000 * work); - std::cout << "Pi computation completed!\n"; - - exit(0); -} diff --git a/bin/wfbench b/bin/wfbench index 4bc03cba..f5f63eba 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright (c) 2021-2025 The WfCommons Team. @@ -12,19 +12,24 @@ import os import pathlib import subprocess import time -import sys import signal -import queue +import sys import argparse import re import json import logging import pandas as pd +import psutil from io import StringIO from filelock import FileLock from pathos.helpers import mp as multiprocessing -from typing import List, Optional + +from abc import ABC, abstractmethod +from typing import List, Optional, IO + +int32_max = 2147483647 +this_dir = pathlib.Path(__file__).resolve().parent # Configure logging @@ -35,10 +40,6 @@ logging.basicConfig( handlers=[logging.StreamHandler()] ) - -this_dir = pathlib.Path(__file__).resolve().parent - - def log_info(msg: str): """ Log an info message to stderr @@ -66,6 +67,213 @@ def log_error(msg: str): """ logging.error(msg) +# Utility process class +####################### + +class ProcessHandle: + def __init__(self, proc: multiprocessing.Process | subprocess.Popen): + self._proc = proc + + @property + def pid(self): + return self._proc.pid + + def terminate(self): + self._proc.terminate() + + def terminate_along_with_children(self): + if isinstance(self._proc, multiprocessing.Process): + self._proc.terminate() + return + try: + pgid = os.getpgid(self._proc.pid) + os.killpg(pgid, signal.SIGKILL) + except ProcessLookupError: + pass # group leader already gone, try children directly + except PermissionError: + pass + finally: + # Catch any re-parented children (ppid=1) that psutil can still see + try: + for child in psutil.Process(self._proc.pid).children(recursive=True): + try: + child.kill() + except psutil.NoSuchProcess: + pass + except psutil.NoSuchProcess: + pass + + def wait(self): + if isinstance(self._proc, multiprocessing.Process): + self._proc.join() + else: + self._proc.wait() + + def is_alive(self): + if isinstance(self._proc, multiprocessing.Process): + return self._proc.is_alive() + else: + return self._proc.poll() is None + +# Benchmark classes +################### + +class Benchmark(ABC): + @abstractmethod + def run(self) -> multiprocessing.Process: + pass + +class IOReadBenchmark: + def __init__(self): + self.to_read : dict[str, (IO, int)] = {} + + def add_read_operation(self, filepath: str, opened_file: IO, num_bytes: int): + self.to_read[filepath] = (opened_file, num_bytes) + + def run(self) -> ProcessHandle | None: + if len(self.to_read) <= 0: + return None + p = multiprocessing.Process(target=self.benchmark_function, args=()) + p.start() + return ProcessHandle(p) + + def benchmark_function(self): + for filepath, (opened_file, bytes_to_read) in self.to_read.items(): + log_debug(f"Reading {bytes_to_read} bytes from {filepath}...") + opened_file.read(bytes_to_read) + + +class IOWriteBenchmark: + def __init__(self): + self.to_write : dict[str, (IO, int)] = {} + + def add_write_operation(self, filepath: str, opened_file: IO, num_bytes: int): + self.to_write[filepath] = (opened_file, num_bytes) + + def run(self) -> ProcessHandle | None: + if len(self.to_write) <= 0: + return None + p = multiprocessing.Process(target=self.benchmark_function, args=()) + p.start() + return ProcessHandle(p) + + def benchmark_function(self): + for filepath, (opened_file, bytes_to_write) in self.to_write.items(): + log_debug(f"Writing {bytes_to_write} bytes to {filepath}...") + opened_file.write(os.urandom(int(bytes_to_write))) + opened_file.flush() + + +class CPUBenchmark: + def __init__(self, cpu_threads: Optional[int] = 5, + mem_threads: Optional[int] = 5, + core: Optional[int] = None, + total_mem: Optional[int] = None): + self.cpu_threads = cpu_threads + self.mem_threads = mem_threads + self.core = core + self.total_mem = total_mem + self.work = None + + def set_work(self, work: int): + self.work = work + + def set_infinite_work(self): + self.work = int32_max # "infinite" + + def run(self) -> list[ProcessHandle | None]: + if self.work is None or self.work <= 0: + return [None, None] + + total_mem = f"{self.total_mem}B" if self.total_mem else f"{100.0 / os.cpu_count()}%" + cpu_work_per_thread = int(1000000 * self.work / (16384 * self.cpu_threads)) if self.cpu_threads != 0 else int32_max ** 2 + cpu_samples = min(cpu_work_per_thread, int32_max) + cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max + if cpu_ops > int32_max: + log_info("Exceeded maximum allowed value of cpu work.") + cpu_ops = int32_max + + + # Start CPU benchmark, if need be + cpu_proc_handle = None + if self.cpu_threads > 0: + log_debug(f"Running CPU benchmark with {self.cpu_threads} threads for {self.work if self.work < int32_max else 'infinite'} units of work...") + cpu_prog = ["stress-ng", "--monte-carlo", f"{self.cpu_threads}", + "--monte-carlo-method", "pi", + "--monte-carlo-rand", "lcg", + "--monte-carlo-samples", f"{cpu_samples}", + "--monte-carlo-ops", f"{cpu_ops}", + "--quiet"] + cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) + cpu_proc_handle = ProcessHandle(cpu_proc) + + # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) + if self.core: + os.sched_setaffinity(cpu_proc.pid, {self.core}) + + # Start Memory benchmark, if need be + mem_proc_handle = None + if self.mem_threads > 0: + # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows + log_debug(f"Running memory benchmark with {self.mem_threads} threads...") + mem_prog = ["stress-ng", "--vm", f"{self.mem_threads}", + "--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"] + mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) + mem_proc_handle = ProcessHandle(mem_proc) + if self.core: + os.sched_setaffinity(mem_proc.pid, {self.core}) + + return [cpu_proc_handle, mem_proc_handle] + + +class GPUBenchmark: + + @staticmethod + def get_available_gpus(): + proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, _ = proc.communicate() + df = pd.read_csv(StringIO(stdout.decode("utf-8")), sep=" ") + return df[df["utilization.gpu"] <= 5].index.to_list() + + def __init__(self): + self.work = None + self.duration = None + self.device = None + + def set_device(self): + available_gpus = self.get_available_gpus() # checking for available GPUs + if not available_gpus: + log_error("No GPU available") + sys.exit(1) + self.device = available_gpus[0] + log_debug(f"GPU benchmark instantiated for device {self.device}") + + def set_work(self, work: int): + self.work = work + + def set_time(self, duration: float): + self.duration = duration + + def run(self) -> ProcessHandle | None: + if self.work is None and self.duration is None: + return None + + if self.duration is not None: + log_debug(f"Running GPU benchmark for {self.duration} seconds") + gpu_prog = [ + f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={self.device} {this_dir.joinpath('./gpu_benchmark')} {self.work} {self.duration}"] + else: + log_debug(f"Running GPU benchmark for {self.work} units of work") + gpu_prog = [ + f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={self.device} {this_dir.joinpath('./gpu_benchmark')} {self.work}"] + + p = subprocess.Popen(gpu_prog, shell=True) + return ProcessHandle(p) + + +# Utility code functions +######################## def lock_core(path_locked: pathlib.Path, path_cores: pathlib.Path) -> int: @@ -128,187 +336,6 @@ def unlock_core(path_locked: pathlib.Path, finally: lock.release() -def monitor_progress(proc, cpu_queue): - """Monitor progress from the CPU benchmark process.""" - for line in iter(proc.stdout.readline, ""): # No decode needed - line = line.strip() - if line.startswith("Progress:"): - try: - progress = float(line.split()[1].strip('%')) - cpu_queue.put(progress) - except (ValueError, IndexError): - pass - -def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, - cpu_threads: Optional[int] = 5, - mem_threads: Optional[int] = 5, - cpu_work: Optional[int] = 100, - core: Optional[int] = None, - total_mem: Optional[int] = None) -> List: - """ - Run CPU and memory benchmark. - - :param cpu_queue: Queue to push CPU benchmark progress as a float. - :type cpu_queue: multiprocessing.Queue - :param cpu_threads: Number of threads for CPU benchmark. - :type cpu_threads: Optional[int] - :param mem_threads: Number of threads for memory benchmark. - :type mem_threads: Optional[int] - :param cpu_work: Total work units for CPU benchmark. - :type cpu_work: Optional[int] - :param core: Core to pin the benchmark processes to. - :type core: Optional[int] - :param total_mem: Total memory to use for memory benchmark. - :type total_mem: Optional[float] - - :return: Lists of CPU and memory subprocesses. - :rtype: List - """ - total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" - cpu_work_per_thread = int(cpu_work / cpu_threads) - - cpu_procs = [] - mem_procs = [] - cpu_prog = [f"{this_dir.joinpath('cpu-benchmark')}", f"{cpu_work_per_thread}"] - mem_prog = ["stress-ng", "--vm", f"{mem_threads}", - "--vm-bytes", f"{total_mem}", "--vm-keep"] - - for i in range(cpu_threads): - cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - - # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) - if core: - os.sched_setaffinity(cpu_proc.pid, {core}) - cpu_procs.append(cpu_proc) - - # Start a thread to monitor the progress of each CPU benchmark process - monitor_thread = multiprocessing.Process(target=monitor_progress, args=(cpu_proc, cpu_queue)) - monitor_thread.start() - - if mem_threads > 0: - # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows - mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) - if core: - os.sched_setaffinity(mem_proc.pid, {core}) - mem_procs.append(mem_proc) - - return cpu_procs, mem_procs - - -def io_read_benchmark_user_input_data_size(inputs, - rundir=None, - memory_limit=None): - if memory_limit is None: - memory_limit = -1 - memory_limit = int(memory_limit) - log_debug("Starting IO Read Benchmark...") - for file, size in inputs.items(): - with open(rundir.joinpath(file), "rb") as fp: - log_debug(f"Reading '{file}'") - chunk_size = min(size, memory_limit) - while fp.read(chunk_size): - pass - log_debug("Completed IO Read Benchmark!") - - -def io_write_benchmark_user_input_data_size(outputs, - rundir=None, - memory_limit=None): - if memory_limit is None: - memory_limit = sys.maxsize - memory_limit = int(memory_limit) - for file_name, file_size in outputs.items(): - log_debug(f"Writing output file '{file_name}'") - file_size_todo = file_size - while file_size_todo > 0: - with open(rundir.joinpath(file_name), "ab") as fp: - chunk_size = min(file_size_todo, memory_limit) - file_size_todo -= fp.write(os.urandom(int(chunk_size))) - - -def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit=None, rundir=None, event=None): - """Alternate between reading and writing to a file, ensuring read only happens after write.""" - - if memory_limit is None: - memory_limit = 10 * 1024 * 1024 # sys.maxsize - memory_limit = int(memory_limit) - - # queue will have messages in the form (cpu_percent_completed) - # Get the last message and trash the rest - - # Create empty files - for name in outputs: - open(rundir.joinpath(name), "wb").close() - - io_completed = 0 - bytes_read = { - name: 0 - for name in inputs - } - bytes_written = { - name: 0 - for name in outputs - } - - # get size of inputs - inputs = { - name: os.path.getsize(rundir.joinpath(name)) - for name in inputs - } - - while io_completed < 100: - cpu_percent = max(io_completed, cpu_queue.get()) - while True: # Get the last message - try: - cpu_percent = max(io_completed, cpu_queue.get_nowait()) - except queue.Empty: - break - - log_debug(f"CPU Percent: {cpu_percent}") - if cpu_percent: - bytes_to_read = { - name: int(size * (cpu_percent / 100) - bytes_read[name]) - for name, size in inputs.items() - } - bytes_to_write = { - name: int(size * (cpu_percent / 100) - bytes_written[name]) - for name, size in outputs.items() - } - io_read_benchmark_user_input_data_size(bytes_to_read, rundir, memory_limit=memory_limit) - io_write_benchmark_user_input_data_size(bytes_to_write, rundir, memory_limit=memory_limit) - - bytes_read = { - name: bytes_read[name] + bytes_to_read[name] - for name in bytes_to_read - } - bytes_written = { - name: bytes_written[name] + bytes_to_write[name] - for name in bytes_to_write - } - - log_debug(f"Bytes Read: {bytes_read}") - log_debug(f"Bytes Written: {bytes_written}") - - io_completed = cpu_percent - - if io_completed >= 100: - break - -def get_available_gpus(): - proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, _ = proc.communicate() - df = pd.read_csv(StringIO(stdout.decode("utf-8")), sep=" ") - return df[df["utilization.gpu"] <= 5].index.to_list() - - -def gpu_benchmark(time: int = 100, - work: int = 100, - device: int = 0): #work, device - - gpu_prog = [f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"] - log_debug(f"Running GPU Benchmark: {gpu_prog}") - subprocess.Popen(gpu_prog, shell=True) - def get_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() @@ -319,29 +346,35 @@ def get_parser() -> argparse.ArgumentParser: parser.add_argument("--path-lock", default=None, help="Path to lock file.") parser.add_argument("--path-cores", default=None, help="Path to cores file.") parser.add_argument("--cpu-work", default=None, help="Amount of CPU work.") + parser.add_argument("--num-chunks", default=10, help="Number of chunks used for pipelining I/O and " + "computation throughout the execution (fewer chunks may be used " + "if amounts of work and or input/output file sizes are too small).") parser.add_argument("--gpu-work", default=None, help="Amount of GPU work.") - parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete the task (overrides CPU and GPU works).") - parser.add_argument("--mem", type=float, default=None, help="Max amount (in MB) of memory consumption.") - parser.add_argument("--output-files", help="output file names with sizes in bytes as a JSON dictionary " + parser.add_argument("--time-limit", default=None, help="Time limit (in seconds) to complete " + "the computational portion of the benchmark (overrides CPU and GPU works). " + "Is only approximate since I/O time may make the overall time longer.") + parser.add_argument("--mem", type=float, default=None, help="Maximum memory consumption (in MB).") + parser.add_argument("--output-files", help="Output file names with sizes in bytes as a JSON dictionary " "(e.g., --output-files {\\\"file1\\\": 1024, \\\"file2\\\": 2048}).") - parser.add_argument("--input-files", help="input files names as a JSON array " + parser.add_argument("--input-files", help="Input files names as a JSON array " "(e.g., --input-files [\\\"file3\\\", \\\"file4\\\"]).") - parser.add_argument("--debug", action="store_true", help="Enable debug messages.") + parser.add_argument("--silent", action="store_true", help="Disable all log messages.") + parser.add_argument("--debug", action="store_true", help="Enable debug log messages.") parser.add_argument("--with-flowcept", action="store_true", default=False, help="Enable Flowcept monitoring.") parser.add_argument("--workflow_id", default=None, help="Id to group tasks in a workflow.") return parser - -def begin_flowcept(args): - log_info("Running with Flowcept.") + +def begin_flowcept(workflow_id, name, used): + log_debug("Running with Flowcept.") from flowcept import Flowcept, FlowceptTask # TODO: parametrize to allow storing individual tasks - f = Flowcept(workflow_id=args.workflow_id, - bundle_exec_id=args.workflow_id, + f = Flowcept(workflow_id=workflow_id, + bundle_exec_id=workflow_id, start_persistence=False, save_workflow=False) f.start() - t = FlowceptTask(task_id=f"{args.workflow_id}_{args.name}", workflow_id=args.workflow_id, used={**args.__dict__}) + t = FlowceptTask(task_id=f"{workflow_id}_{name}", workflow_id=workflow_id, used=used) return f, t @@ -350,154 +383,231 @@ def end_flowcept(flowcept, flowcept_task): flowcept.stop() -def main(): - """Main program.""" - parser = get_parser() - args = parser.parse_args() - core = None - - if args.with_flowcept: - flowcept, flowcept_task = begin_flowcept(args) +def compute_num_chunks(time_limit, cpu_work, gpu_work, num_chunks): + # Compute the (feasible number of chunks) + min_chunk_size_time = 1.0 # At least 1 second per chunk, if we're doing time-based + # TODO: Pick reasonable factors below so that a chunk takes about min_chunk_size_time sec on a reasonable machine + min_chunk_size_cpu_work = 3000000 * min_chunk_size_time # 1s on my MacBook Pro + min_chunk_size_gpu_work = 30000000 * min_chunk_size_time # unknown..... - if args.debug: - logging.getLogger().setLevel(logging.DEBUG) - - if args.rundir: - rundir = pathlib.Path(args.rundir) + if time_limit: + num_chunks = min(int(num_chunks), int(float(time_limit) / min_chunk_size_time)) else: - rundir = pathlib.Path(os.getcwd()) - - if args.path_lock and args.path_cores: - path_locked = pathlib.Path(args.path_lock) - path_cores = pathlib.Path(args.path_cores) - core = lock_core(path_locked, path_cores) - - log_info(f"Starting {args.name} Benchmark") - - mem_bytes = args.mem * 1024 * 1024 if args.mem else None - - procs = [] - io_proc = None - outputs_dict = {} - - cpu_queue = multiprocessing.Queue() - - log_debug(f"Working directory: {os.getcwd()}") - - # Deal with input/output files if any - cleaned_input = "{}" if args.input_files is None else re.sub(r'\\+', '', args.input_files) - cleaned_output = "{}" if args.output_files is None else re.sub(r'\\+', '', args.output_files) - # print("CLEANED INPUT", cleaned_input) - # print("CLEANED OUTPUT", cleaned_output) + if cpu_work: + num_chunks_cpu = min(num_chunks, cpu_work // min_chunk_size_cpu_work) + else: + num_chunks_cpu = 1 + if gpu_work: + num_chunks_gpu = min(int(num_chunks), int(float(gpu_work) / min_chunk_size_gpu_work)) + else: + num_chunks_gpu = 1 + num_chunks = min(num_chunks_cpu, num_chunks_gpu) - if cleaned_input or cleaned_output: - log_debug("Starting IO benchmark...") + num_chunks = max(num_chunks, 1) # The above computations may say "zero" + return num_chunks - # Attempt to parse the cleaned string - try: - outputs_dict = json.loads(cleaned_output) - except json.JSONDecodeError as e: - log_error(f"Failed to decode --output-files JSON string argument: {e}") - sys.exit(1) +def kill_current_handles(handles: list[ProcessHandle]): + for handle in handles: + if handle is not None and handle.is_alive(): + handle.terminate_along_with_children() - try: - inputs_array = json.loads(cleaned_input) - except json.JSONDecodeError as e: - log_error(f"Failed to decode --input-files JSON string argument: {e}") - sys.exit(1) - # print("OUTPUT", outputs_dict) - # print("INPUTS", inputs_array) +def run(workflow_id, name, with_flowcept, silent, debug, rundir, path_lock, path_cores, + time_limit, cpu_work, percent_cpu, mem, gpu_work, num_chunks, + input_files, output_files): + """Main function.""" - # Create a multiprocessing event that in the first run is set to True - write_done_event = multiprocessing.Event() - # Set this to True to allow the first read to happen - write_done_event.set() - # Print the value of the event - # print("Event Value:", write_done_event.is_set()) + if with_flowcept: + flowcept, flowcept_task = begin_flowcept(workflow_id, name, locals()) + else: + flowcept = None + flowcept_task = None - io_proc = multiprocessing.Process( - target=io_alternate, - args=(inputs_array, outputs_dict, cpu_queue, mem_bytes, rundir, write_done_event) - ) - io_proc.start() - procs.append(io_proc) + if silent: + logging.getLogger().setLevel(logging.NOTSET) + if debug: + logging.getLogger().setLevel(logging.DEBUG) - if args.gpu_work: - log_info(f"Starting GPU Benchmark for {args.name}...") - available_gpus = get_available_gpus() #checking for available GPUs + if rundir: + rundir = pathlib.Path(rundir) + else: + rundir = pathlib.Path(os.getcwd()) - if not available_gpus: - log_error("No GPU available") - sys.exit(1) + if path_lock and path_cores: + path_locked = pathlib.Path(path_lock) + path_cores = pathlib.Path(path_cores) + core = lock_core(path_locked, path_cores) + else: + path_locked = None + path_cores = None + core = None + + # Compute the (feasible) number of chunks based on the arguments + num_chunks = compute_num_chunks(time_limit, cpu_work, gpu_work, num_chunks) + log_debug(f"Executing benchmark with {num_chunks} chunks.") + + # At this point we know the number of chunks, and we can just iterate as follows (N = num_chunks + 2) + # step 0 sep 1 step 2 step N-3 step N-2 step N-1 + # READ READ READ ... READ - - + # - COMPUTE_CPU COMPUTE_CPU ... COMPUTE_CPU COMPUTE_CPU - + # - COMPUTE_GPU COMPUTE_GPU ... COMPUTE_GPU COMPUTE_GPU - + # - - WRITE ... WRITE WRITE WRITE + # (Intermediate READ and WRITE steps may do nothing for some files if there is too little data) + + # Construct a list of benchmark steps, where each step is a list of IO benchmarks (Read or Write) + # and a list of non-IO benchmarks (CPU, GPU). Initially these are all "do nothing" benchmarks + N = num_chunks + 2 + steps = [{"io_read_benchmark": IOReadBenchmark(), + "io_write_benchmark": IOWriteBenchmark(), + "cpu_benchmark": CPUBenchmark(cpu_threads=int(10 * percent_cpu), + mem_threads=int(10 - 10 * percent_cpu), + core=core, + total_mem=mem * 1000 * 1000 if mem else None), + "gpu_benchmark": GPUBenchmark()} for i in range(N)] + + min_chunk_size_data = 1000 # 1KB per chunk at a minimum for each input / output file, otherwise the file + # is read/written all at once at the beginning/end + + # Augment I/O read benchmarks for each input file + cleaned_input = "{}" if input_files is None else re.sub(r'\\+', '', input_files) + try: + input_files = json.loads(cleaned_input) + except json.JSONDecodeError as e: + log_error(f"Failed to decode --input-files JSON string argument: {e}") + sys.exit(1) + + for file_path in input_files: + file_size = os.path.getsize(file_path) + # If file is zero-size, do nothing + if file_size == 0: + continue + opened_file = open(rundir / file_path, "rb") + # If file is "small" only read it at the beginning + if file_size < num_chunks * min_chunk_size_data: + steps[0]["io_read_benchmark"].add_read_operation(file_path, opened_file, file_size) + continue + # Otherwise, read it in chunks + for step in range(0, N-2): + num_bytes = file_size // num_chunks + (file_size % num_chunks > step) + steps[step]["io_read_benchmark"].add_read_operation(file_path, opened_file, num_bytes) + + # Augment I/O write benchmarks for each output file + cleaned_output = "{}" if output_files is None else re.sub(r'\\+', '', output_files) + try: + output_files = json.loads(cleaned_output) + except json.JSONDecodeError as e: + log_error(f"Failed to decode --output-files JSON string argument: {e}") + sys.exit(1) + + for file_path, file_size in output_files.items(): + # Open the file for writing no matter what (it should be created) + opened_file = open(rundir / file_path, "wb") + # If file is zero-size, do nothing + if file_size == 0: + continue + # If file is "small" only write it at the end + if file_size < num_chunks * min_chunk_size_data: + steps[N-1]["io_write_benchmark"].add_write_operation(file_path, opened_file, file_size) + continue + # Otherwise, write it in chunks + for step in range(2, N): + num_bytes = file_size // num_chunks + (file_size % num_chunks > (step - 2)) + steps[step]["io_write_benchmark"].add_write_operation(file_path, opened_file, num_bytes) + + # Augment CPU benchmark with computation (if need be) + if cpu_work: + if time_limit: + for step in range(1, N-1): + steps[step]["cpu_benchmark"].set_infinite_work() else: - device = available_gpus[0] - log_debug(f"Running on GPU {device}") - - if args.time: - log_debug(f" Time:{args.time}, Work:{args.gpu_work}, Device:{device}") - gpu_benchmark(time=int(args.time), work=int(args.gpu_work), device=device) - else: - gpu_benchmark(work=int(args.gpu_work), device=device) - - if args.cpu_work: - log_info(f"Starting CPU and Memory Benchmarks for {args.name}...") - if core: - log_debug(f"{args.name} acquired core {core}") - - mem_threads=int(10 - 10 * args.percent_cpu) - cpu_procs, mem_procs = cpu_mem_benchmark(cpu_queue=cpu_queue, - cpu_threads=int(10 * args.percent_cpu), - mem_threads=mem_threads, - cpu_work=sys.maxsize if args.time else int(args.cpu_work), - core=core, - total_mem=mem_bytes) - - procs.extend(cpu_procs) - if args.time: - time.sleep(int(args.time)) - for proc in procs: - if isinstance(proc, multiprocessing.Process): - if proc.is_alive(): - proc.terminate() - elif isinstance(proc, subprocess.Popen): - proc.terminate() + for step in range(1, N-1): + chunk_work = int(cpu_work) // num_chunks + (int(cpu_work) % num_chunks > step - 1) + steps[step]["cpu_benchmark"].set_work(chunk_work) + + # Augment GPU benchmark with computation (if need be) + if gpu_work: + if time_limit: + for step in range(1, N - 1): + steps[step]["gpu_benchmark"].set_device() + steps[step]["gpu_benchmark"].set_work(int(gpu_work)) + steps[step]["gpu_benchmark"].set_time(float(time_limit)) else: - for proc in procs: - if isinstance(proc, subprocess.Popen): - proc.wait() - if io_proc is not None and io_proc.is_alive(): - # io_proc.terminate() - io_proc.join() - - for mem_proc in mem_procs: - try: - os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails - except subprocess.TimeoutExpired: - log_debug("Memory process did not terminate; force-killing.") - # As a fallback, use pkill if any remaining instances are stuck - subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() - - log_debug("Completed CPU and Memory Benchmarks!") - - # NOTE: If you would like to run only IO add time.sleep(2) - # Check if all procs are done, if not, kill them - log_debug("Checking if all processes are done...") - for proc in procs: - if isinstance(proc, multiprocessing.Process): - if proc.is_alive(): - proc.terminate() - proc.join() - if isinstance(proc, subprocess.Popen): - proc.wait() - + for step in range(1, N - 1): + chunk_work = int(gpu_work) // num_chunks + (int(gpu_work) % num_chunks > step - 1) + steps[step]["gpu_benchmark"].set_device() + steps[step]["gpu_benchmark"].set_work(chunk_work) + + # All benchmarks have been specified, we can just go through the steps blindly + # log_info(f"Starting {args.name} Benchmark") + + current_proc_handles = [] + try: + for step_index, step in enumerate(steps): + log_debug(f"**** STEP {step_index} ***") + io_read_process = step["io_read_benchmark"].run() + current_proc_handles += [io_read_process] + io_write_process = step["io_write_benchmark"].run() + [cpu_benchmark_process, memory_benchmark_process] = step["cpu_benchmark"].run() + current_proc_handles += [cpu_benchmark_process, memory_benchmark_process] + gpu_benchmark_process = step["gpu_benchmark"].run() + current_proc_handles += [gpu_benchmark_process] + current_proc_handles[:] = [io_read_process, cpu_benchmark_process, memory_benchmark_process, gpu_benchmark_process] + + # If time based, sleep the required amount of time and kill the process + if time_limit: + if cpu_benchmark_process is not None or gpu_benchmark_process is not None: + time.sleep(float(time_limit) / num_chunks) + if cpu_benchmark_process is not None: + cpu_benchmark_process.terminate_along_with_children() + if gpu_benchmark_process is not None: + gpu_benchmark_process.terminate() + + # Wait for the I/O processes to be done + if io_read_process is not None: + io_read_process.wait() + if io_write_process is not None: + io_write_process.wait() + + # Wait for the CPU process to be done + if cpu_benchmark_process is not None: + cpu_benchmark_process.wait() + + # Kill the Memory process + if memory_benchmark_process is not None: + memory_benchmark_process.terminate_along_with_children() + memory_benchmark_process.wait() + + # Wait for the GPU Process to be done + if gpu_benchmark_process is not None: + gpu_benchmark_process.wait() + except KeyboardInterrupt: + log_debug("Detected Keyboard interrupt: cleaning up processes...") + kill_current_handles(current_proc_handles) + finally: + log_debug("Aborting: cleaning up processes...") + kill_current_handles(current_proc_handles) + + + # Cleanups if core: unlock_core(path_locked, path_cores, core) - if args.with_flowcept: + if with_flowcept: end_flowcept(flowcept, flowcept_task) - log_info(f"Benchmark {args.name} completed!") + log_info(f"{name} benchmark completed") +def main(): + # Parse command-line argument + parser = get_parser() + args = parser.parse_args() + + # Sanity checks + if not args.time_limit and (not args.cpu_work and not args.gpu_work): + log_error("At least one of --time-limit, --cpu-work, or --gpu-work should be provided.") + sys.exit(1) + + run(**vars(args)) + if __name__ == "__main__": main() diff --git a/docs/source/generating_workflow_benchmarks.rst b/docs/source/generating_workflow_benchmarks.rst index a980e2e0..46067634 100644 --- a/docs/source/generating_workflow_benchmarks.rst +++ b/docs/source/generating_workflow_benchmarks.rst @@ -35,9 +35,7 @@ recipe to generate a task graph. Once the task graph has been generated, each ta is set to be an instance of the workflow task benchmark. For each task, the following values for the parameters of the workflow task benchmark can be specified: -- :code:`cpu_work`: CPU work per workflow task. The :code:`cpu-benchmark` executable - (compiled C++) calculates an increasingly precise value of π up until the specified - total amount of computation (cpu_work) has been performed. +- :code:`cpu_work`: CPU work per workflow task. - :code:`data`: Individual data volumes for each task in a way that is coherent with respect to task data dependencies (in the form of a dictionary of input size files per workflow task type). Alternatively, a total data footprint (in MB) diff --git a/pyproject.toml b/pyproject.toml index 47a72e57..b0ef8dd3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "networkx", "numpy", "pandas", + "psutil", "python-dateutil", "requests", "scipy>=1.16.1", diff --git a/setup.py b/setup.py index a04c14d5..3ac2be74 100644 --- a/setup.py +++ b/setup.py @@ -16,32 +16,33 @@ from setuptools.command.build_ext import build_ext class Build(build_ext): - """Customized setuptools build command - builds cpu-benchmark on build.""" + """Customized setuptools build command""" def run(self): - # Try to build the cpu-benchmark, but make it optional - # This allows installation on Windows where make/g++ may not be available - try: - result = subprocess.call(["make"], stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) - if result != 0: - sys.stderr.write("Warning: 'make' build failed. cpu-benchmark will not be available.\n") - sys.stderr.write("This is expected on Windows. To build cpu-benchmark, install make and g++.\n") - except (FileNotFoundError, OSError): - sys.stderr.write("Warning: 'make' is not installed. cpu-benchmark will not be available.\n") - sys.stderr.write("This is expected on Windows. To build cpu-benchmark, install make and g++.\n") - super().run() + pass + # OLD CODE TO build a C++ executable + # # This allows installation on Windows where make/g++ may not be available + # try: + # result = subprocess.call(["make"], stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) + # if result != 0: + # sys.stderr.write("Warning: 'make' build failed. cpu-benchmark will not be available.\n") + # sys.stderr.write("This is expected on Windows. To build cpu-benchmark, install make and g++.\n") + # except (FileNotFoundError, OSError): + # sys.stderr.write("Warning: 'make' is not installed. cpu-benchmark will not be available.\n") + # sys.stderr.write("This is expected on Windows. To build cpu-benchmark, install make and g++.\n") + # super().run() # Conditionally include cpu-benchmark based on platform data_files = [] -if sys.platform != 'win32': - # On Unix-like systems (Linux, macOS, Docker), always try to include it - # The Build class will create it during the build process - data_files.append(('bin', ['bin/cpu-benchmark'])) -else: - # On Windows, only include if it exists (e.g., if user manually compiled it) - cpu_benchmark_path = 'bin/cpu-benchmark' - if os.path.exists(cpu_benchmark_path): - data_files.append(('bin', [cpu_benchmark_path])) +# if sys.platform != 'win32': +# # On Unix-like systems (Linux, macOS, Docker), always try to include it +# # The Build class will create it during the build process +# data_files.append(('bin', ['bin/cpu-benchmark'])) +# else: +# # On Windows, only include if it exists (e.g., if user manually compiled it) +# cpu_benchmark_path = 'bin/cpu-benchmark' +# if os.path.exists(cpu_benchmark_path): +# data_files.append(('bin', [cpu_benchmark_path])) setup( packages=find_packages(), diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 724425f8..bc9ae020 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -57,13 +57,8 @@ def _install_WfCommons_on_container(container): # Cleanup files that came from the host exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", user="wfcommons", stdout=True, stderr=True) exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", user="wfcommons", stdout=True, stderr=True) - # Clean up and force a rebuild of cpu-benchmark (because it may be compiled for the wrong architecture) - exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", user="wfcommons", stdout=True, - stderr=True) - exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark", user="wfcommons", stdout=True, - stderr=True) - # Install WfCommons on the container (to install wfbench and cpu-benchmark really) + # Install WfCommons on the container (to install wfbench really) exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages", user="wfcommons", workdir="/tmp/WfCommons", stdout=True, stderr=True) @@ -96,28 +91,24 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command= user="wfcommons", privileged=True, tty=True, - detach=True + detach=True, + init=True # For zombies ) # Installing WfCommons on container _install_WfCommons_on_container(container) - # Copy over the wfbench and cpu-benchmark executables to where they should go on the container + # Copy over the wfbench executable to where they should go on the container if bin_dir: - sys.stderr.write(f"[{backend}] Copying wfbench and cpu-benchmark...\n") + sys.stderr.write(f"[{backend}] Copying wfbench...\n") exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir], user="wfcommons", stdout=True, stderr=True) if exit_code != 0: raise RuntimeError("Failed to copy wfbench script to the bin directory") - exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir], - user="wfcommons", - stdout=True, stderr=True) - if exit_code != 0: - raise RuntimeError("Failed to copy cpu-benchmark executable to the bin directory") else: - sys.stderr.write(f"[{backend}] Not Copying wfbench and cpu-benchmark...\n") + sys.stderr.write(f"[{backend}] Not Copying wfbench...\n") container.backend = backend return container diff --git a/tests/translators_loggers/Dockerfile.swiftt b/tests/translators_loggers/Dockerfile.swiftt index 98dc56a6..2bf9a12b 100644 --- a/tests/translators_loggers/Dockerfile.swiftt +++ b/tests/translators_loggers/Dockerfile.swiftt @@ -56,6 +56,7 @@ RUN conda tos accept --override-channels --channel https://repo.anaconda.com/pkg conda create -n swiftt-env python=3.11 -y RUN conda run -n swiftt-env python --version RUN conda run -n swiftt-env pip install flowcept +RUN conda run -n swiftt-env pip install pandas filelock RUN conda run -n swiftt-env pip install py-cpuinfo psutil redis RUN conda run -n swiftt-env conda install -y -c conda-forge gcc zsh zlib pathos RUN conda run -n swiftt-env conda install -y -c swift-t swift-t diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 2eccca46..f29304f4 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -101,6 +101,9 @@ def _additional_setup_swiftt(container): cmd=["bash", "-c", "redis-server"], user="wfcommons", detach=True, stdout=True, stderr=True) # Note that exit_code will always be None because of detach=True. + # Give redis time to start! + time.sleep(1) + # Check that the redis-server is up exit_code, output = container.exec_run( cmd=["bash", "-c", "redis-cli ping"], user="wfcommons", stdout=True, stderr=True) @@ -129,15 +132,19 @@ def _additional_setup_swiftt(container): def run_workflow_dask(container, num_tasks, str_dirpath): exit_code, output = container.exec_run("python ./dask_workflow.py", user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) - assert (output.decode().count("completed!") == num_tasks) + if exit_code != 0: + print(output.decode()) + assert False + assert (output.decode().count("benchmark completed") == num_tasks) # TODO: Look at the (I think) generated run.json file on the container? def run_workflow_parsl(container, num_tasks, str_dirpath): exit_code, output = container.exec_run("python ./parsl_workflow.py", user="wfcommons", stdout=True, stderr=True) ignored, output = container.exec_run(f"cat {str_dirpath}/runinfo/000/parsl.log", user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert ("completed" in output.decode()) assert (output.decode().count("_complete_task") == num_tasks) @@ -146,7 +153,9 @@ def run_workflow_nextflow(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(f"nextflow run ./workflow.nf --pwd .", user="wfcommons", stdout=True, stderr=True) ignored, task_exit_codes = container.exec_run("find . -name .exitcode -exec cat {} \;", user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert (task_exit_codes.decode() == num_tasks * "0") def run_workflow_airflow(container, num_tasks, str_dirpath): @@ -155,22 +164,28 @@ def run_workflow_airflow(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd=["sh", "-c", "cd /home/wfcommons/ && sudo /bin/bash /run_a_workflow.sh Blast-Benchmark"], user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert (output.decode().count("completed") == num_tasks * 2) def run_workflow_bash(container, num_tasks, str_dirpath): # Run the workflow! exit_code, output = container.exec_run(cmd="/bin/bash ./run_workflow.sh", user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) - assert (output.decode().count("completed") == num_tasks) + if exit_code != 0: + print(output.decode()) + assert False + assert (output.decode().count("benchmark completed") == num_tasks) def run_workflow_taskvine(container, num_tasks, str_dirpath): # Run the workflow! exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && python3 ./taskvine_workflow.py"], user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert (output.decode().count("completed") == num_tasks) def run_workflow_makeflow(container, num_tasks, str_dirpath): @@ -178,7 +193,9 @@ def run_workflow_makeflow(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && makeflow --log-verbose --monitor=./monitor_data/ ./workflow.makeflow"], user="wfcommons", stdout=True, stderr=True) # Check sanity - assert (exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False num_completed_jobs = len(re.findall(r'job \d+ completed', output.decode())) assert (num_completed_jobs == num_tasks) @@ -187,10 +204,11 @@ def run_workflow_cwl(container, num_tasks, str_dirpath): # Note that the input file is hardcoded and Blast-specific exit_code, output = container.exec_run(cmd="cwltool ./main.cwl --split_fasta_00000001_input ./data/workflow_infile_0001 ", user="wfcommons", stdout=True, stderr=True) - # print(output.decode()) # Check sanity - assert (exit_code == 0) - # this below is ugly (the 3 is for "workflow", "compile_output_files" and "compile_log_files", + if exit_code != 0: + print(output.decode()) + assert False + # this below is ugly (the 3 is for "workflow", "compile_output_files" and "compile_log_files", # and there is a 2* because there is a message for the job and for the step) assert (output.decode().count("completed success") == 3 + 2 *num_tasks) @@ -199,10 +217,11 @@ def run_workflow_streamflow(container, num_tasks, str_dirpath): # Note that the input file is hardcoded and Blast-specific exit_code, output = container.exec_run(cmd="streamflow run ./streamflow.yml", user="wfcommons", stdout=True, stderr=True) - # print(output.decode()) # Check sanity - assert (exit_code == 0) - # 2 extra "COMPLETED Step" ("COMPLETED Step /compile_output_files", "COMPLETED Step /compile_log_files") + if exit_code != 0: + print(output.decode()) + assert False + # 2 extra "COMPLETED Step" ("COMPLETED Step /compile_output_files", "COMPLETED Step /compile_log_files") assert (output.decode().count("COMPLETED Step") == num_tasks + 2) # Generate RO-Crate now that the workflow has completed (Fails for now) @@ -221,18 +240,20 @@ def run_workflow_pegasus(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd="bash /home/wfcommons/run_workflow.sh", user="wfcommons", stdout=True, stderr=True) # Check sanity - assert(exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert("success" in output.decode()) def run_workflow_swiftt(container, num_tasks, str_dirpath): # Run the workflow! exit_code, output = container.exec_run(cmd="swift-t workflow.swift", user="wfcommons", stdout=True, stderr=True) - # sys.stderr.write(output.decode()) # Check sanity - assert(exit_code == 0) + if exit_code != 0: + print(output.decode()) + assert False assert (output.decode().count("completed!") == num_tasks) - pass run_workflow_methods = { "dask": run_workflow_dask, @@ -270,18 +291,18 @@ class TestTranslators: @pytest.mark.parametrize( "backend", [ - "swiftt", - "dask", - "parsl", - "nextflow", - "nextflow_subworkflow", - "airflow", - "bash", - "taskvine", - "makeflow", - "cwl", - "streamflow", - "pegasus", + "swiftt", + "dask", + "parsl", + "nextflow", + "nextflow_subworkflow", + "airflow", + "bash", + "taskvine", + "makeflow", + "cwl", + "streamflow", + "pegasus", ]) @pytest.mark.unit # @pytest.mark.skip(reason="tmp") @@ -330,7 +351,7 @@ def test_translator(self, backend) -> None: if backend == "pegasus": parser = PegasusLogsParser(dirpath / "work/wfcommons/pegasus/Blast-Benchmark/run0001/") elif backend == "taskvine": - parser = TaskVineLogsParser(dirpath / "vine-run-info/most-recent/vine-logs", filenames_to_ignore=["cpu-benchmark","stress-ng", "wfbench"]) + parser = TaskVineLogsParser(dirpath / "vine-run-info/most-recent/vine-logs", filenames_to_ignore=["stress-ng", "wfbench"]) elif backend == "makeflow": parser = MakeflowLogsParser(execution_dir = dirpath, resource_monitor_logs_dir = dirpath / "monitor_data/") elif backend == "streamflow": @@ -351,3 +372,6 @@ def test_translator(self, backend) -> None: # Shutdown the container (weirdly, container is already shutdown by now... not sure how) _shutdown_docker_container_and_remove_image(container) + # Remove the created local directory + _remove_local_dir_if_it_exists(str_dirpath) + diff --git a/wfcommons/wfbench/bench.py b/wfcommons/wfbench/bench.py index 2f013546..7bee4eb0 100644 --- a/wfcommons/wfbench/bench.py +++ b/wfcommons/wfbench/bench.py @@ -86,6 +86,7 @@ def create_benchmark_from_synthetic_workflow( percent_cpu: Union[float, Dict[str, float]] = 0.6, cpu_work: Union[int, Dict[str, int]] = None, gpu_work: Union[int, Dict[str, int]] = None, + num_chunks: Optional[int] = 10, time: Optional[int] = None, mem: Optional[float] = None, lock_files_folder: Optional[pathlib.Path] = None, @@ -102,6 +103,8 @@ def create_benchmark_from_synthetic_workflow( :type cpu_work: Union[int, Dict[str, int]] :param gpu_work: Maximum GPU work per workflow task. :type gpu_work: Union[int, Dict[str, int]] + :param num_chunks: Number of chunks for pipelining I/O and computation for each task execution. + :type num_chunks: Optional[int] :param time: Time limit for running each task (in seconds). :type time: Optional[int] :param mem: Maximum amount of memory consumption per task (in MB). @@ -164,6 +167,7 @@ def create_benchmark_from_synthetic_workflow( task_percent_cpu, task_cpu_work, task_gpu_work, + num_chunks, time, task_memory, lock_files_folder, @@ -252,6 +256,7 @@ def create_benchmark(self, percent_cpu: Union[float, Dict[str, float]] = 0.6, cpu_work: Union[int, Dict[str, int]] = None, gpu_work: Union[int, Dict[str, int]] = None, + num_chunks: Optional[int] = 10, time: Optional[int] = None, data: Optional[int] = 0, mem: Optional[float] = None, @@ -269,6 +274,8 @@ def create_benchmark(self, :type cpu_work: Union[int, Dict[str, int]] :param gpu_work: GPU work per workflow task. :type gpu_work: Union[int, Dict[str, int]] + :param num_chunks: Number of chunks for pipelining I/O and computation for each task execution. + :type num_chunks: Optional[int] :param time: Time limit for running each task (in seconds). :type time: Optional[int] :param data: Total workflow data footprint (in MB). @@ -308,6 +315,7 @@ def create_benchmark(self, cpu_work, gpu_work, time, + num_chunks, mem, lock_files_folder, cores, @@ -367,6 +375,7 @@ def _set_argument_parameters(self, cpu_work: Union[int, Dict[str, int]], gpu_work: Union[int, Dict[str, int]], time: Optional[int], + num_chunks: Optional[int], mem: Optional[float], lock_files_folder: Optional[pathlib.Path], cores: Optional[pathlib.Path], @@ -381,6 +390,7 @@ def _set_argument_parameters(self, params.extend(cpu_params) gpu_params = self._generate_task_gpu_params(task, gpu_work) params.extend(gpu_params) + params.extend([f"--num-chunks {num_chunks}"]) if mem: params.extend([f"--mem {mem}"]) diff --git a/wfcommons/wfbench/translator/abstract_translator.py b/wfcommons/wfbench/translator/abstract_translator.py index 47c6afb2..435c18ac 100644 --- a/wfcommons/wfbench/translator/abstract_translator.py +++ b/wfcommons/wfbench/translator/abstract_translator.py @@ -94,7 +94,6 @@ def _copy_binary_files(self, output_folder: pathlib.Path) -> None: bin_folder.mkdir(exist_ok=True) shutil.copy(shutil.which("wfbench"), bin_folder) - shutil.copy(shutil.which("cpu-benchmark"), bin_folder) def _generate_input_files(self, output_folder: pathlib.Path) -> None: """ diff --git a/wfcommons/wfbench/translator/pegasus.py b/wfcommons/wfbench/translator/pegasus.py index 33c9aabc..348fd5b5 100644 --- a/wfcommons/wfbench/translator/pegasus.py +++ b/wfcommons/wfbench/translator/pegasus.py @@ -65,7 +65,6 @@ def translate(self, output_folder: pathlib.Path, tasks_priorities: Optional[Dict " is_stageable=True)\n" \ "transformation.add_env(PATH='/usr/bin:/bin:.')\n" \ "transformation.add_profiles(Namespace.CONDOR, 'request_disk', '10')\n" \ - "transformation.add_requirement(t_cpu_benchmark)\n" \ "tc.add_transformations(transformation)\n\n" # adding tasks diff --git a/wfcommons/wfbench/translator/swift_t.py b/wfcommons/wfbench/translator/swift_t.py index 610beb18..407b4afa 100644 --- a/wfcommons/wfbench/translator/swift_t.py +++ b/wfcommons/wfbench/translator/swift_t.py @@ -74,7 +74,7 @@ def translate(self, output_folder: pathlib.Path) -> None: self.logger.debug("Defining input files") in_count = 0 self.output_folder = output_folder - self.cpu_benchmark = output_folder.joinpath("./bin/cpu-benchmark").absolute() + self.wfbench = output_folder.joinpath("./bin/wfbench").absolute() self.script = f"string fs = sprintf(flowcept_start, \"{self.workflow.workflow_id}\");\nstring fss = python_persist(fs);\n\n" if self.workflow.workflow_id else "" self.script += "string root_in_files[];\n" @@ -116,6 +116,9 @@ def translate(self, output_folder: pathlib.Path) -> None: self._copy_binary_files(output_folder) self._generate_input_files(output_folder) + # README file + self._write_readme_file(output_folder) + def _find_categories_list(self, task_name: str, parent_task: Optional[str] = None) -> None: """" Find list of task categories ordered by task dependencies. @@ -218,7 +221,7 @@ def _add_tasks(self, category: str) -> None: self.script += f"foreach i in [0:{num_tasks - 1}] {{\n" \ f" string of = sprintf(\"{self.output_folder.absolute()}/data/{category}_%i_output.txt\", i);\n" \ f" string task_id = \"{category}_\" + i;\n" \ - f" string cmd_{self.cmd_counter} = sprintf(command, \"{self.cpu_benchmark}\", task_id, {args});\n" \ + f" string cmd_{self.cmd_counter} = sprintf(command, \"{self.wfbench}\", task_id, {args});\n" \ f" string co_{self.cmd_counter} = python_persist(cmd_{self.cmd_counter});\n" \ f" string of_{self.cmd_counter} = sprintf(\"0%s\", co_{self.cmd_counter});\n" \ f" {category}__out[i] = string2int(of_{self.cmd_counter});\n" \ @@ -232,9 +235,22 @@ def _add_tasks(self, category: str) -> None: self.out_files.add(out_file) args = args.replace( ", of", f", \"{out_file}\"").replace("[i]", "[0]") - self.script += f"string cmd_{self.cmd_counter} = sprintf(command, \"{self.cpu_benchmark}\", \"{category}_{self.cmd_counter}\", {args});\n" \ + self.script += f"string cmd_{self.cmd_counter} = sprintf(command, \"{self.wfbench}\", \"{category}_{self.cmd_counter}\", {args});\n" \ f"string co_{self.cmd_counter} = python_persist(cmd_{self.cmd_counter});\n" \ f"string of_{self.cmd_counter} = sprintf(\"0%s\", co_{self.cmd_counter});\n" \ f"{category}__out[0] = string2int(of_{self.cmd_counter});\n\n" self.cmd_counter += 1 + + def _write_readme_file(self, output_folder: pathlib.Path) -> None: + """ + Write the README file. + + :param output_folder: The path of the output folder. + :type output_folder: pathlib.Path + """ + readme_file_path = output_folder.joinpath("README") + with open(readme_file_path, "w") as out: + out.write(f"Start a REDIS server: redis-server\n") + out.write(f"[Optional] Check that REDIS works: redis-cli ping (it should say \"PONG\")\n") + out.write(f"Run the workflow: swift-t workflow.swift\n") \ No newline at end of file diff --git a/wfcommons/wfbench/translator/taskvine.py b/wfcommons/wfbench/translator/taskvine.py index 2e3bcbaa..e1179eb0 100644 --- a/wfcommons/wfbench/translator/taskvine.py +++ b/wfcommons/wfbench/translator/taskvine.py @@ -118,7 +118,6 @@ def _add_task(self, task_name: str, parent_task: Optional[str] = None) -> list[s f_counter = 1 task_script = f"t_{self.task_counter}.add_poncho_package(poncho_pkg)\n" \ f"t_{self.task_counter}.add_input(wfbench, 'wfbench')\n" \ - f"t_{self.task_counter}.add_input(cpu_bench, 'cpu-benchmark')\n" \ f"t_{self.task_counter}.add_input(stress_ng, 'stress-ng')\n" input_spec = "\"[" for file in task.input_files: diff --git a/wfcommons/wfbench/translator/templates/pegasus_template.py b/wfcommons/wfbench/translator/templates/pegasus_template.py index fc956660..39bd4b21 100644 --- a/wfcommons/wfbench/translator/templates/pegasus_template.py +++ b/wfcommons/wfbench/translator/templates/pegasus_template.py @@ -22,9 +22,9 @@ def which(file): tc = TransformationCatalog() rc = ReplicaCatalog() -t_cpu_benchmark = Transformation('cpu-benchmark', site='local', -pfn = os.getcwd() + '/bin/cpu-benchmark', is_stageable=True) -tc.add_transformations(t_cpu_benchmark) +# t_cpu_benchmark = Transformation('cpu-benchmark', site='local', +# pfn = os.getcwd() + '/bin/cpu-benchmark', is_stageable=True) +# tc.add_transformations(t_cpu_benchmark) transformation_path = os.getcwd() + '/bin/wfbench' task_output_files = {} diff --git a/wfcommons/wfbench/translator/templates/swift_t/workflow.swift b/wfcommons/wfbench/translator/templates/swift_t/workflow.swift index 6097cbc3..1b740f12 100644 --- a/wfcommons/wfbench/translator/templates/swift_t/workflow.swift +++ b/wfcommons/wfbench/translator/templates/swift_t/workflow.swift @@ -25,7 +25,7 @@ logging.basicConfig( handlers=[logging.StreamHandler()] ) -workflow_id = "%s" +workflow_id = "%s" workflow_name = "%s" out_files = [%s] @@ -63,12 +63,12 @@ string command = """ import logging import os +import sys import pathlib import signal import socket import subprocess import time -from pathos.helpers import mp as multiprocessing __import__("logging").basicConfig( level=logging.INFO, @@ -77,14 +77,15 @@ __import__("logging").basicConfig( handlers=[logging.StreamHandler()] ) -cpu_benchmark = "%s" +wfbench = "%s" task_name = "%s" -files_list = ["%s"] +input_file = ["%s"] gpu_work = int(%i) cpu_work = int(%i) percent_cpu = %f cpu_threads = int(10 * percent_cpu) -output_data = {"%s": int(%i)} +output_file = "%s" +output_file_size = int(%i) dep = %i workflow_id = "%s" task_id = f"{workflow_id}_{task_name}" @@ -106,131 +107,31 @@ if 'workflow_id': __import__("logging").info(f"Starting {task_name} Benchmark on {socket.gethostname()}") -procs = [] -cpu_queue = multiprocessing.Queue() -__import__("logging").debug(f"Working directory: {os.getcwd()}") - -__import__("logging").debug("Starting IO benchmark...") -io_proc = None -termination_event = multiprocessing.Event() - -io_proc = multiprocessing.Process( - target=lambda inputs=files_list, outputs=output_data, cpu_queue=cpu_queue, - termination_event=termination_event: ( - memory_limit := 10 * 1024 * 1024, - [open(name, "wb").close() for name in outputs], - io_completed := 0, - bytes_read := {name: 0 for name in inputs}, - bytes_written := {name: 0 for name in outputs}, - input_sizes := {name: __import__("os").path.getsize(name) for name in inputs}, - [ - ( - cpu_percent := cpu_queue.get(timeout=1.0), - should_exit := termination_event.is_set(), - ( - while_loop_var := True, - [ - ( - new_val := ( - cpu_queue.get(timeout = 1.0) - if not cpu_queue.empty() else None - ), - cpu_percent := ( - max(cpu_percent, new_val) - if new_val is not None else cpu_percent - ), - while_loop_var := ( - new_val is not None and not cpu_queue.empty() - ) - ) - for _ in range(100) if while_loop_var - ], - bytes_to_read := { - name: max(0, int(size * (cpu_percent / 100) - bytes_read[name])) - for name, size in input_sizes.items() - }, - bytes_to_write := { - name: max(0, int(size * (cpu_percent / 100) - bytes_written[name])) - for name, size in outputs.items() - }, - __import__("logging").debug("Starting IO Read Benchmark..."), - in_file := list(bytes_to_read.keys())[0], - in_size := list(bytes_to_read.values())[0], - open(in_file, "rb").read(int(in_size)), - __import__("logging").debug("Completed IO Read Benchmark!"), - out_file := list(outputs.keys())[0], - out_size := list(outputs.values())[0], - __import__("logging").debug(f"Writing output file '{out_file}'"), - open(out_file, "ab").write(__import__("os").urandom(int(out_size))), - bytes_read.update({ - name: bytes_read[name] + bytes_to_read[name] - for name in bytes_to_read - }), - bytes_written.update({ - name: bytes_written[name] + bytes_to_write[name] - for name in bytes_to_write - }), - - __import__("logging").debug(f"Bytes Read: {bytes_read}"), - __import__("logging").debug(f"Bytes Written: {bytes_written}"), - io_completed := cpu_percent, - ) if cpu_percent is not None else time.sleep(0.1), - not (should_exit or io_completed >= 100) - ) - for _ in range(1000000) - if not (io_completed >= 100 or termination_event.is_set()) - ], - __import__("logging").info("IO benchmark completed") - ) +from importlib.util import spec_from_file_location, module_from_spec +from importlib.machinery import SourceFileLoader +loader = SourceFileLoader("wfbench", wfbench) +spec = spec_from_file_location("wfbench", wfbench, loader=loader) +mod = module_from_spec(spec) +spec.loader.exec_module(mod) +mod.run( + name=task_name, + workflow_id=workflow_id, + percent_cpu=percent_cpu, + cpu_work=cpu_work, + mem=None, + gpu_work=gpu_work, + output_files=f'{{"{output_file}": {output_file_size}}}', + input_files=str(input_file).replace("'", '"'), + with_flowcept=True, + silent=False, + debug=False, + rundir=None, + path_lock=None, + path_cores=None, + time_limit=None, + num_chunks=10, ) -io_proc.start() -procs.append(io_proc) - -if cpu_work > 0: - __import__("logging").info(f"Starting CPU and Memory Benchmarks for {task_name}...") - - mem_threads = 10 - cpu_threads - cpu_work_per_thread = int(cpu_work / cpu_threads) - - cpu_procs = [] - mem_procs = [] - cpu_prog = [f"{cpu_benchmark}", f"{cpu_work_per_thread}"] - mem_prog = ["stress-ng", "--vm", f"{mem_threads}", - "--vm-bytes", "0.05%%", "--vm-keep"] - - for i in range(cpu_threads): - cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - cpu_procs.append(cpu_proc) - monitor_thread = multiprocessing.Process( - target=lambda proc=cpu_proc, queue=cpu_queue: - [ - queue.put(float(line.strip().split()[1].strip('%%'))) - for line in iter(proc.stdout.readline, "") - if line.strip() and line.strip().startswith("Progress:") - ] - ) - monitor_thread.start() - - if mem_threads > 0: - mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) - mem_procs.append(mem_proc) - - procs.extend(cpu_procs) - for proc in procs: - if isinstance(proc, subprocess.Popen): - proc.wait() - if io_proc is not None and io_proc.is_alive(): - io_proc.join() - - for mem_proc in mem_procs: - try: - os.kill(mem_proc.pid, signal.SIGKILL) - except subprocess.TimeoutExpired: - __import__("logging").debug("Memory process did not terminate; force-killing.") - subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() - - __import__("logging").info("Completed CPU and Memory Benchmarks!") - + __import__("logging").info(f"Benchmark {task_name} completed!") if 'workflow_id': diff --git a/wfcommons/wfbench/translator/templates/taskvine_template.py b/wfcommons/wfbench/translator/templates/taskvine_template.py index 496ae82f..48b76ea2 100644 --- a/wfcommons/wfbench/translator/templates/taskvine_template.py +++ b/wfcommons/wfbench/translator/templates/taskvine_template.py @@ -46,7 +46,6 @@ def wait_for_tasks_completion(): # wfbench executable files wfbench = m.declare_file("bin/wfbench", cache="workflow") -cpu_bench = m.declare_file("bin/cpu-benchmark", cache="workflow") stress_ng = m.declare_file(shutil.which("stress-ng"), cache="workflow") # Generated code goes here diff --git a/wfcommons/wfinstances/logs/taskvine.py b/wfcommons/wfinstances/logs/taskvine.py index 2003d853..136872d9 100644 --- a/wfcommons/wfinstances/logs/taskvine.py +++ b/wfcommons/wfinstances/logs/taskvine.py @@ -45,7 +45,7 @@ class TaskVineLogsParser(LogsParser): are input to tasks. This argument is the list of names of files that should be ignored in the reconstructed instances, which typically do not include such files at task input. For instance, if reconstructing a workflow from an execution - of a WfBench-generated benchmark, one could pass ["wfbench", "cpu-benchmark", "stress-ng"] + of a WfBench-generated benchmark, one could pass ["wfbench", "stress-ng"] :type filenames_to_ignore: List[str] :param description: Workflow instance description. :type description: Optional[str]