Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add goldenrun only and missing only flags #78

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 150 additions & 7 deletions controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import argparse
import hashlib
import logging
from multiprocessing import Manager, Process
from multiprocessing import Manager, Process, Value
from pathlib import Path
import signal
import subprocess
import sys
import tables
Expand All @@ -47,6 +48,24 @@

clogger = logging.getLogger(__name__)

stop_signal_received = Value("i", 0)


def signal_handler(signum, frame):
global stop_signal_received
stop_signal_received.value = 1


def register_signal_handlers():
signal.signal(
signal.SIGTERM,
signal_handler,
)
signal.signal(
signal.SIGINT,
signal_handler,
)


def build_ranges_dict(fault_dict):
"""
Expand Down Expand Up @@ -468,6 +487,60 @@ def read_backup(hdf5_file):
return [backup_expanded_faults, backup_config, backup_goldenrun]


def read_simulated_faults(hdf5_file):
with tables.open_file(hdf5_file, "r") as f_in:
# Process simulated faults
simulated_faults_hash = set()
exp_n = 0

for exp in tqdm(
f_in.root.fault,
total=f_in.root.fault._v_nchildren,
desc="Reading simulated faults",
):
simulated_exp = {
"index": exp_n,
"faultlist": [
Fault(
fault["fault_address"],
[],
fault["fault_type"],
fault["fault_model"],
fault["fault_lifespan"],
fault["fault_mask"],
fault["trigger_address"],
fault["trigger_hitcounter"],
fault["fault_num_bytes"],
fault["fault_wildcard"],
)
for fault in exp.faults.iterrows()
],
}

config_string = ""
for fault in simulated_exp["faultlist"]:
config_string += str(fault)
simulated_faults_hash.add(config_string)

exp_n = exp_n + 1

return simulated_faults_hash


def get_not_simulated_faults(faultlist, simulated_faults):
missing_faultlist = []

for faultconfig in faultlist:
config_string = ""
for fault in faultconfig["faultlist"]:
config_string += str(fault)

if config_string not in simulated_faults:
missing_faultlist.append(faultconfig)

return missing_faultlist


def controller(
args,
hdf5mode,
Expand All @@ -476,6 +549,8 @@ def controller(
num_workers,
queuedepth,
compressionlevel,
missing_only,
goldenrun_only,
goldenrun=True,
logger=hdf5collector,
qemu_pre=None,
Expand Down Expand Up @@ -535,7 +610,10 @@ def controller(
)
return config_qemu

clogger.info("Backup matched and will be used")
clogger.info("Backup matched")

if goldenrun_only:
return config_qemu

faultlist = backup_expanded_faultlist
config_qemu["max_instruction_count"] = backup_config["max_instruction_count"]
Expand All @@ -561,13 +639,35 @@ def controller(
log_config = False
log_goldenrun = False

if goldenrun_only:
faultlist = []
overwrite_faults = False

log_config = True
log_goldenrun = True

if missing_only:
simulated_faults = read_simulated_faults(hdf5_file)
faultlist = get_not_simulated_faults(faultlist, simulated_faults)

log_config = False
log_goldenrun = False

overwrite_faults = False

if faultlist:
clogger.info(f"{len(faultlist)} faults are missing and will be simulated")
else:
clogger.info("All faults are already simulated")

p_logger = Process(
target=logger,
args=(
hdf5path,
hdf5mode,
queue_output,
len(faultlist),
stop_signal_received,
compressionlevel,
logger_postprocess,
log_config,
Expand Down Expand Up @@ -598,9 +698,23 @@ def controller(
continue
goldenrun_data[keyword] = pd.DataFrame(goldenrun_data[keyword])

pbar = tqdm(total=len(faultlist), desc="Simulating faults")
# Handlers are used for a graceful exit, in case of a signal
register_signal_handlers()

itter = 0
while 1:
if stop_signal_received.value == 1:
clogger.info(
"Stop signal received, finishing the current write operation..."
)

p_logger.join()

for p in p_list:
p["process"].kill()

break

if len(p_list) == 0 and itter == len(faultlist):
clogger.debug("Done inserting qemu jobs")
break
Expand Down Expand Up @@ -671,8 +785,6 @@ def controller(
# Find finished processes
p["process"].join(timeout=0)
if p["process"].is_alive() is False:
# Update the progress bar
pbar.update(1)
# Recalculate moving average
p_time_list.append(current_time - p["start_time"])
len_p_time_list = len(p_time_list)
Expand All @@ -685,7 +797,6 @@ def controller(
break

clogger.debug("{} experiments remaining in queue".format(queue_output.qsize()))
pbar.close()
p_logger.join()

clogger.debug("Done with qemu and logger")
Expand All @@ -697,7 +808,11 @@ def controller(
"Took {}:{}:{} to complete all experiments".format(int(h), int(m), int(s))
)

tperindex = (t1 - t0) / len(faultlist)
if faultlist:
tperindex = (t1 - t0) / len(faultlist)
else:
tperindex = t1 - t0

tperworker = tperindex / num_workers
clogger.debug(
"Took average of {}s per fault, python worker rough runtime is {}s".format(
Expand Down Expand Up @@ -787,6 +902,19 @@ def get_argument_parser():
action="store_true",
required=False,
)
parser.add_argument(
"--goldenrun-only",
help="Only run goldenrun",
action="store_true",
required=False,
)
parser.add_argument(
"--missing-only",
"-m",
help="Only run missing experiments",
action="store_true",
required=False,
)
return parser


Expand Down Expand Up @@ -823,6 +951,19 @@ def process_arguments(args):
)
exit(1)

if args.goldenrun_only:
parguments["goldenrun_only"] = True
parguments["goldenrun"] = True
else:
parguments["goldenrun_only"] = False

if args.missing_only and hdf5file.is_file():
parguments["missing_only"] = True
parguments["hdf5mode"] = "a"
parguments["goldenrun"] = False
else:
parguments["missing_only"] = False

qemu_conf = json.load(args.qemu)
args.qemu.close()
print(qemu_conf)
Expand Down Expand Up @@ -930,6 +1071,8 @@ def init_logging():
parguments["num_workers"], # num_workers
parguments["queuedepth"], # queuedepth
parguments["compressionlevel"], # compressionlevel
parguments["missing_only"], # missing_only flag
parguments["goldenrun_only"], # goldenrun_only flag
parguments["goldenrun"], # goldenrun
hdf5collector, # logger
None, # qemu_pre
Expand Down
13 changes: 13 additions & 0 deletions faultclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,19 @@ def __init__(
self.num_bytes = num_bytes
self.wildcard = wildcard

def __str__(self):
return (
f"{self.trigger.address}"
f"{self.trigger.hitcounter}"
f"{self.address}"
f"{self.type}"
f"{self.model}"
f"{self.lifespan}"
f"{self.mask}"
f"{self.num_bytes}"
f"{self.wildcard}"
)


def write_fault_list_to_pipe(fault_list, fifo):
fault_pack = fault_pb2.FaultPack()
Expand Down
24 changes: 22 additions & 2 deletions hdf5logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import signal
import logging
import time

Expand All @@ -25,6 +26,14 @@
logger = logging.getLogger(__name__)


def register_signal_handlers():
"""
Ignore signals, they will be handled by the controller.py anyway
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)


# Tables for storing the elements from queue
class translation_block_exec_table(tables.IsDescription):
tb = tables.UInt64Col()
Expand Down Expand Up @@ -431,7 +440,7 @@ def process_config(f, configgroup, exp, myfilter):
endtable.close()


def process_backup(f, configgroup, exp, myfilter):
def process_backup(f, configgroup, exp, myfilter, stop_signal):
process_config(f, configgroup, exp["config"], myfilter)

fault_expanded_group = f.create_group(
Expand All @@ -444,6 +453,9 @@ def process_backup(f, configgroup, exp, myfilter):
for exp_number in tqdm(
range(len(exp["expanded_faultlist"])), desc="Creating backup"
):
if stop_signal.value == 1:
break

exp_group = f.create_group(
fault_expanded_group, exp_name.format(exp_number), "Group containing faults"
)
Expand All @@ -463,12 +475,15 @@ def hdf5collector(
mode,
queue_output,
num_exp,
stop_signal,
compressionlevel,
logger_postprocess=None,
log_goldenrun=True,
log_config=False,
overwrite_faults=False,
):
register_signal_handlers()

prctl.set_name("logger")
prctl.set_proctitle("logger")
f = tables.open_file(hdf5path, mode, max_group_width=65536)
Expand All @@ -491,7 +506,10 @@ def hdf5collector(
):
n._f_remove(recursive=True)

pbar = tqdm(total=num_exp, desc="Simulating faults", disable=not num_exp)
while num_exp > 0 or log_goldenrun or log_pregoldenrun or log_config:
if stop_signal.value == 1:
break
# readout queue and get next output from qemu. Will block
exp = queue_output.get()
t1 = time.time()
Expand All @@ -514,6 +532,7 @@ def hdf5collector(
)
)
num_exp = num_exp - 1
pbar.update(1)
elif exp["index"] == -2 and log_pregoldenrun:
if "Pregoldenrun" in f.root:
raise ValueError("Pregoldenrun already exists!")
Expand All @@ -537,7 +556,7 @@ def hdf5collector(
"/", "Backup", "Group containing backup and run information"
)

process_backup(f, exp_group, exp, myfilter)
process_backup(f, exp_group, exp, myfilter, stop_signal)
log_config = False
continue
else:
Expand Down Expand Up @@ -567,5 +586,6 @@ def hdf5collector(

del exp

pbar.close()
f.close()
logger.debug("Data Logging done")
Loading