570 lines
19 KiB
Python
Executable file
570 lines
19 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
#
|
|
# Copyright (C) 2020 - 2022 Collabora Limited
|
|
# Authors:
|
|
# Gustavo Padovan <gustavo.padovan@collabora.com>
|
|
# Guilherme Gallo <guilherme.gallo@collabora.com>
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
"""Send a job to LAVA, track it and collect log back"""
|
|
|
|
|
|
import argparse
|
|
import contextlib
|
|
import pathlib
|
|
import re
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import urllib.parse
|
|
import xmlrpc.client
|
|
from datetime import datetime, timedelta
|
|
from os import getenv
|
|
from typing import Any, Optional
|
|
|
|
import lavacli
|
|
import yaml
|
|
from lava.exceptions import (
|
|
MesaCIException,
|
|
MesaCIKnownIssueException,
|
|
MesaCIParseException,
|
|
MesaCIRetryError,
|
|
MesaCITimeoutError,
|
|
)
|
|
from lava.utils import CONSOLE_LOG
|
|
from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS
|
|
from lava.utils import (
|
|
GitlabSection,
|
|
LogFollower,
|
|
LogSectionType,
|
|
fatal_err,
|
|
hide_sensitive_data,
|
|
print_log,
|
|
)
|
|
from lavacli.utils import loader
|
|
|
|
# Timeout in seconds to decide if the device from the dispatched LAVA job has
|
|
# hung or not due to the lack of new log output.
|
|
DEVICE_HANGING_TIMEOUT_SEC = int(getenv("LAVA_DEVICE_HANGING_TIMEOUT_SEC", 5*60))
|
|
|
|
# How many seconds the script should wait before try a new polling iteration to
|
|
# check if the dispatched LAVA job is running or waiting in the job queue.
|
|
WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 10))
|
|
|
|
# How many seconds to wait between log output LAVA RPC calls.
|
|
LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5))
|
|
|
|
# How many retries should be made when a timeout happen.
|
|
NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2))
|
|
|
|
# How many attempts should be made when a timeout happen during LAVA device boot.
|
|
NUMBER_OF_ATTEMPTS_LAVA_BOOT = int(getenv("LAVA_NUMBER_OF_ATTEMPTS_LAVA_BOOT", 3))
|
|
|
|
|
|
def generate_lava_yaml(args):
|
|
# General metadata and permissions, plus also inexplicably kernel arguments
|
|
values = {
|
|
'job_name': 'mesa: {}'.format(args.pipeline_info),
|
|
'device_type': args.device_type,
|
|
'visibility': { 'group': [ args.visibility_group ] },
|
|
'priority': 75,
|
|
'context': {
|
|
'extra_nfsroot_args': ' init=/init rootwait usbcore.quirks=0bda:8153:k'
|
|
},
|
|
"timeouts": {
|
|
"job": {"minutes": args.job_timeout},
|
|
"action": {"minutes": 3},
|
|
"actions": {
|
|
"depthcharge-action": {
|
|
"minutes": 3 * NUMBER_OF_ATTEMPTS_LAVA_BOOT,
|
|
}
|
|
}
|
|
},
|
|
}
|
|
|
|
if args.lava_tags:
|
|
values['tags'] = args.lava_tags.split(',')
|
|
|
|
# URLs to our kernel rootfs to boot from, both generated by the base
|
|
# container build
|
|
deploy = {
|
|
'timeout': { 'minutes': 10 },
|
|
'to': 'tftp',
|
|
'os': 'oe',
|
|
'kernel': {
|
|
'url': '{}/{}'.format(args.kernel_url_prefix, args.kernel_image_name),
|
|
},
|
|
'nfsrootfs': {
|
|
'url': '{}/lava-rootfs.tar.zst'.format(args.rootfs_url_prefix),
|
|
'compression': 'zstd',
|
|
}
|
|
}
|
|
if args.kernel_image_type:
|
|
deploy['kernel']['type'] = args.kernel_image_type
|
|
if args.dtb:
|
|
deploy['dtb'] = {
|
|
'url': '{}/{}.dtb'.format(args.kernel_url_prefix, args.dtb)
|
|
}
|
|
|
|
# always boot over NFS
|
|
boot = {
|
|
"failure_retry": NUMBER_OF_ATTEMPTS_LAVA_BOOT,
|
|
"method": args.boot_method,
|
|
"commands": "nfs",
|
|
"prompts": ["lava-shell:"],
|
|
}
|
|
|
|
# skeleton test definition: only declaring each job as a single 'test'
|
|
# since LAVA's test parsing is not useful to us
|
|
run_steps = []
|
|
test = {
|
|
'timeout': { 'minutes': args.job_timeout },
|
|
'failure_retry': 1,
|
|
'definitions': [ {
|
|
'name': 'mesa',
|
|
'from': 'inline',
|
|
'lava-signal': 'kmsg',
|
|
'path': 'inline/mesa.yaml',
|
|
'repository': {
|
|
'metadata': {
|
|
'name': 'mesa',
|
|
'description': 'Mesa test plan',
|
|
'os': [ 'oe' ],
|
|
'scope': [ 'functional' ],
|
|
'format': 'Lava-Test Test Definition 1.0',
|
|
},
|
|
'run': {
|
|
"steps": run_steps
|
|
},
|
|
},
|
|
} ],
|
|
}
|
|
|
|
# job execution script:
|
|
# - inline .gitlab-ci/common/init-stage1.sh
|
|
# - fetch and unpack per-pipeline build artifacts from build job
|
|
# - fetch and unpack per-job environment from lava-submit.sh
|
|
# - exec .gitlab-ci/common/init-stage2.sh
|
|
|
|
with open(args.first_stage_init, 'r') as init_sh:
|
|
run_steps += [ x.rstrip() for x in init_sh if not x.startswith('#') and x.rstrip() ]
|
|
|
|
if args.jwt_file:
|
|
with open(args.jwt_file) as jwt_file:
|
|
run_steps += [
|
|
"set +x",
|
|
f'echo -n "{jwt_file.read()}" > "{args.jwt_file}" # HIDEME',
|
|
"set -x",
|
|
f'echo "export CI_JOB_JWT_FILE={args.jwt_file}" >> /set-job-env-vars.sh',
|
|
]
|
|
else:
|
|
run_steps += [
|
|
"echo Could not find jwt file, disabling MINIO requests...",
|
|
"sed -i '/MINIO_RESULTS_UPLOAD/d' /set-job-env-vars.sh",
|
|
]
|
|
|
|
run_steps += [
|
|
'mkdir -p {}'.format(args.ci_project_dir),
|
|
'wget -S --progress=dot:giga -O- {} | tar --zstd -x -C {}'.format(args.build_url, args.ci_project_dir),
|
|
'wget -S --progress=dot:giga -O- {} | tar -xz -C /'.format(args.job_rootfs_overlay_url),
|
|
|
|
# Sleep a bit to give time for bash to dump shell xtrace messages into
|
|
# console which may cause interleaving with LAVA_SIGNAL_STARTTC in some
|
|
# devices like a618.
|
|
'sleep 1',
|
|
|
|
# Putting CI_JOB name as the testcase name, it may help LAVA farm
|
|
# maintainers with monitoring
|
|
f"lava-test-case 'mesa-ci_{args.mesa_job_name}' --shell /init-stage2.sh",
|
|
]
|
|
|
|
values['actions'] = [
|
|
{ 'deploy': deploy },
|
|
{ 'boot': boot },
|
|
{ 'test': test },
|
|
]
|
|
|
|
return yaml.dump(values, width=10000000)
|
|
|
|
|
|
def setup_lava_proxy():
|
|
config = lavacli.load_config("default")
|
|
uri, usr, tok = (config.get(key) for key in ("uri", "username", "token"))
|
|
uri_obj = urllib.parse.urlparse(uri)
|
|
uri_str = "{}://{}:{}@{}{}".format(uri_obj.scheme, usr, tok, uri_obj.netloc, uri_obj.path)
|
|
transport = lavacli.RequestsTransport(
|
|
uri_obj.scheme,
|
|
config.get("proxy"),
|
|
config.get("timeout", 120.0),
|
|
config.get("verify_ssl_cert", True),
|
|
)
|
|
proxy = xmlrpc.client.ServerProxy(
|
|
uri_str, allow_none=True, transport=transport)
|
|
|
|
print_log("Proxy for {} created.".format(config['uri']))
|
|
|
|
return proxy
|
|
|
|
|
|
def _call_proxy(fn, *args):
|
|
retries = 60
|
|
for n in range(1, retries + 1):
|
|
try:
|
|
return fn(*args)
|
|
except xmlrpc.client.ProtocolError as err:
|
|
if n == retries:
|
|
traceback.print_exc()
|
|
fatal_err("A protocol error occurred (Err {} {})".format(err.errcode, err.errmsg))
|
|
else:
|
|
time.sleep(15)
|
|
except xmlrpc.client.Fault as err:
|
|
traceback.print_exc()
|
|
fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode))
|
|
|
|
|
|
class LAVAJob:
|
|
COLOR_STATUS_MAP = {
|
|
"pass": CONSOLE_LOG["FG_GREEN"],
|
|
"hung": CONSOLE_LOG["FG_YELLOW"],
|
|
"fail": CONSOLE_LOG["FG_RED"],
|
|
"canceled": CONSOLE_LOG["FG_MAGENTA"],
|
|
}
|
|
|
|
def __init__(self, proxy, definition):
|
|
self.job_id = None
|
|
self.proxy = proxy
|
|
self.definition = definition
|
|
self.last_log_line = 0
|
|
self.last_log_time = None
|
|
self.is_finished = False
|
|
self.status = "created"
|
|
|
|
def heartbeat(self):
|
|
self.last_log_time = datetime.now()
|
|
self.status = "running"
|
|
|
|
def validate(self) -> Optional[dict]:
|
|
"""Returns a dict with errors, if the validation fails.
|
|
|
|
Returns:
|
|
Optional[dict]: a dict with the validation errors, if any
|
|
"""
|
|
return _call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True)
|
|
|
|
def submit(self):
|
|
try:
|
|
self.job_id = _call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
|
|
except MesaCIException:
|
|
return False
|
|
return True
|
|
|
|
def cancel(self):
|
|
if self.job_id:
|
|
self.proxy.scheduler.jobs.cancel(self.job_id)
|
|
|
|
def is_started(self) -> bool:
|
|
waiting_states = ["Submitted", "Scheduling", "Scheduled"]
|
|
job_state: dict[str, str] = _call_proxy(
|
|
self.proxy.scheduler.job_state, self.job_id
|
|
)
|
|
return job_state["job_state"] not in waiting_states
|
|
|
|
def _load_log_from_data(self, data) -> list[str]:
|
|
lines = []
|
|
# When there is no new log data, the YAML is empty
|
|
if loaded_lines := yaml.load(str(data), Loader=loader(False)):
|
|
lines = loaded_lines
|
|
self.last_log_line += len(lines)
|
|
return lines
|
|
|
|
def get_logs(self) -> list[str]:
|
|
try:
|
|
(finished, data) = _call_proxy(
|
|
self.proxy.scheduler.jobs.logs, self.job_id, self.last_log_line
|
|
)
|
|
self.is_finished = finished
|
|
return self._load_log_from_data(data)
|
|
|
|
except Exception as mesa_ci_err:
|
|
raise MesaCIParseException(
|
|
f"Could not get LAVA job logs. Reason: {mesa_ci_err}"
|
|
) from mesa_ci_err
|
|
|
|
def parse_job_result_from_log(
|
|
self, lava_lines: list[dict[str, str]]
|
|
) -> list[dict[str, str]]:
|
|
"""Use the console log to catch if the job has completed successfully or
|
|
not. Returns the list of log lines until the result line."""
|
|
|
|
last_line = None # Print all lines. lines[:None] == lines[:]
|
|
|
|
for idx, line in enumerate(lava_lines):
|
|
if result := re.search(r"hwci: mesa: (pass|fail)", line):
|
|
self.is_finished = True
|
|
self.status = result.group(1)
|
|
|
|
last_line = idx + 1
|
|
# We reached the log end here. hwci script has finished.
|
|
break
|
|
return lava_lines[:last_line]
|
|
|
|
|
|
def find_exception_from_metadata(metadata, job_id):
|
|
if "result" not in metadata or metadata["result"] != "fail":
|
|
return
|
|
if "error_type" in metadata:
|
|
error_type = metadata["error_type"]
|
|
if error_type == "Infrastructure":
|
|
raise MesaCIException(
|
|
f"LAVA job {job_id} failed with Infrastructure Error. Retry."
|
|
)
|
|
if error_type == "Job":
|
|
# This happens when LAVA assumes that the job cannot terminate or
|
|
# with mal-formed job definitions. As we are always validating the
|
|
# jobs, only the former is probable to happen. E.g.: When some LAVA
|
|
# action timed out more times than expected in job definition.
|
|
raise MesaCIException(
|
|
f"LAVA job {job_id} failed with JobError "
|
|
"(possible LAVA timeout misconfiguration/bug). Retry."
|
|
)
|
|
if "case" in metadata and metadata["case"] == "validate":
|
|
raise MesaCIException(
|
|
f"LAVA job {job_id} failed validation (possible download error). Retry."
|
|
)
|
|
return metadata
|
|
|
|
|
|
def find_lava_error(job) -> None:
|
|
# Look for infrastructure errors and retry if we see them.
|
|
results_yaml = _call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
|
|
results = yaml.load(results_yaml, Loader=loader(False))
|
|
for res in results:
|
|
metadata = res["metadata"]
|
|
find_exception_from_metadata(metadata, job.job_id)
|
|
|
|
# If we reach this far, it means that the job ended without hwci script
|
|
# result and no LAVA infrastructure problem was found
|
|
job.status = "fail"
|
|
|
|
|
|
def show_job_data(job):
|
|
with GitlabSection(
|
|
"job_data",
|
|
"LAVA job info",
|
|
type=LogSectionType.LAVA_POST_PROCESSING,
|
|
start_collapsed=True,
|
|
):
|
|
show = _call_proxy(job.proxy.scheduler.jobs.show, job.job_id)
|
|
for field, value in show.items():
|
|
print("{}\t: {}".format(field, value))
|
|
|
|
|
|
def fetch_logs(job, max_idle_time, log_follower) -> None:
|
|
# Poll to check for new logs, assuming that a prolonged period of
|
|
# silence means that the device has died and we should try it again
|
|
if datetime.now() - job.last_log_time > max_idle_time:
|
|
max_idle_time_min = max_idle_time.total_seconds() / 60
|
|
|
|
raise MesaCITimeoutError(
|
|
f"{CONSOLE_LOG['BOLD']}"
|
|
f"{CONSOLE_LOG['FG_YELLOW']}"
|
|
f"LAVA job {job.job_id} does not respond for {max_idle_time_min} "
|
|
"minutes. Retry."
|
|
f"{CONSOLE_LOG['RESET']}",
|
|
timeout_duration=max_idle_time,
|
|
)
|
|
|
|
time.sleep(LOG_POLLING_TIME_SEC)
|
|
|
|
# The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
|
|
# Retry the log fetching several times before exposing the error.
|
|
for _ in range(5):
|
|
with contextlib.suppress(MesaCIParseException):
|
|
new_log_lines = job.get_logs()
|
|
break
|
|
else:
|
|
raise MesaCIParseException
|
|
|
|
if log_follower.feed(new_log_lines):
|
|
# If we had non-empty log data, we can assure that the device is alive.
|
|
job.heartbeat()
|
|
parsed_lines = log_follower.flush()
|
|
|
|
# Only parse job results when the script reaches the end of the logs.
|
|
# Depending on how much payload the RPC scheduler.jobs.logs get, it may
|
|
# reach the LAVA_POST_PROCESSING phase.
|
|
if log_follower.current_section.type in (
|
|
LogSectionType.TEST_CASE,
|
|
LogSectionType.LAVA_POST_PROCESSING,
|
|
):
|
|
parsed_lines = job.parse_job_result_from_log(parsed_lines)
|
|
|
|
for line in parsed_lines:
|
|
print_log(line)
|
|
|
|
|
|
def follow_job_execution(job):
|
|
try:
|
|
job.submit()
|
|
except Exception as mesa_ci_err:
|
|
raise MesaCIException(
|
|
f"Could not submit LAVA job. Reason: {mesa_ci_err}"
|
|
) from mesa_ci_err
|
|
|
|
print_log(f"Waiting for job {job.job_id} to start.")
|
|
while not job.is_started():
|
|
time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
|
|
print_log(f"Job {job.job_id} started.")
|
|
|
|
gl = GitlabSection(
|
|
id="lava_boot",
|
|
header="LAVA boot",
|
|
type=LogSectionType.LAVA_BOOT,
|
|
start_collapsed=True,
|
|
)
|
|
print(gl.start())
|
|
max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
|
|
with LogFollower(current_section=gl) as lf:
|
|
|
|
max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
|
|
# Start to check job's health
|
|
job.heartbeat()
|
|
while not job.is_finished:
|
|
fetch_logs(job, max_idle_time, lf)
|
|
|
|
show_job_data(job)
|
|
|
|
# Mesa Developers expect to have a simple pass/fail job result.
|
|
# If this does not happen, it probably means a LAVA infrastructure error
|
|
# happened.
|
|
if job.status not in ["pass", "fail"]:
|
|
find_lava_error(job)
|
|
|
|
|
|
def print_job_final_status(job):
|
|
if job.status == "running":
|
|
job.status = "hung"
|
|
|
|
color = LAVAJob.COLOR_STATUS_MAP.get(job.status, CONSOLE_LOG["FG_RED"])
|
|
print_log(
|
|
f"{color}"
|
|
f"LAVA Job finished with status: {job.status}"
|
|
f"{CONSOLE_LOG['RESET']}"
|
|
)
|
|
|
|
|
|
def retriable_follow_job(proxy, job_definition) -> LAVAJob:
|
|
retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
|
|
|
|
for attempt_no in range(1, retry_count + 2):
|
|
job = LAVAJob(proxy, job_definition)
|
|
try:
|
|
follow_job_execution(job)
|
|
return job
|
|
except MesaCIKnownIssueException as found_issue:
|
|
print_log(found_issue)
|
|
job.status = "canceled"
|
|
except MesaCIException as mesa_exception:
|
|
print_log(mesa_exception)
|
|
job.cancel()
|
|
except KeyboardInterrupt as e:
|
|
print_log("LAVA job submitter was interrupted. Cancelling the job.")
|
|
job.cancel()
|
|
raise e
|
|
finally:
|
|
print_log(
|
|
f"{CONSOLE_LOG['BOLD']}"
|
|
f"Finished executing LAVA job in the attempt #{attempt_no}"
|
|
f"{CONSOLE_LOG['RESET']}"
|
|
)
|
|
print_job_final_status(job)
|
|
|
|
raise MesaCIRetryError(
|
|
f"{CONSOLE_LOG['BOLD']}"
|
|
f"{CONSOLE_LOG['FG_RED']}"
|
|
"Job failed after it exceeded the number of "
|
|
f"{retry_count} retries."
|
|
f"{CONSOLE_LOG['RESET']}",
|
|
retry_count=retry_count,
|
|
)
|
|
|
|
|
|
def treat_mesa_job_name(args):
|
|
# Remove mesa job names with spaces, which breaks the lava-test-case command
|
|
args.mesa_job_name = args.mesa_job_name.split(" ")[0]
|
|
|
|
|
|
def main(args):
|
|
proxy = setup_lava_proxy()
|
|
|
|
# Overwrite the timeout for the testcases with the value offered by the
|
|
# user. The testcase running time should be at least 4 times greater than
|
|
# the other sections (boot and setup), so we can safely ignore them.
|
|
# If LAVA fails to stop the job at this stage, it will fall back to the
|
|
# script section timeout with a reasonable delay.
|
|
GL_SECTION_TIMEOUTS[LogSectionType.TEST_CASE] = timedelta(minutes=args.job_timeout)
|
|
|
|
job_definition = generate_lava_yaml(args)
|
|
|
|
if args.dump_yaml:
|
|
with GitlabSection(
|
|
"yaml_dump",
|
|
"LAVA job definition (YAML)",
|
|
type=LogSectionType.LAVA_BOOT,
|
|
start_collapsed=True,
|
|
):
|
|
print(hide_sensitive_data(job_definition))
|
|
job = LAVAJob(proxy, job_definition)
|
|
|
|
if errors := job.validate():
|
|
fatal_err(f"Error in LAVA job definition: {errors}")
|
|
print_log("LAVA job definition validated successfully")
|
|
|
|
if args.validate_only:
|
|
return
|
|
|
|
finished_job = retriable_follow_job(proxy, job_definition)
|
|
exit_code = 0 if finished_job.status == "pass" else 1
|
|
sys.exit(exit_code)
|
|
|
|
|
|
def create_parser():
|
|
parser = argparse.ArgumentParser("LAVA job submitter")
|
|
|
|
parser.add_argument("--pipeline-info")
|
|
parser.add_argument("--rootfs-url-prefix")
|
|
parser.add_argument("--kernel-url-prefix")
|
|
parser.add_argument("--build-url")
|
|
parser.add_argument("--job-rootfs-overlay-url")
|
|
parser.add_argument("--job-timeout", type=int)
|
|
parser.add_argument("--first-stage-init")
|
|
parser.add_argument("--ci-project-dir")
|
|
parser.add_argument("--device-type")
|
|
parser.add_argument("--dtb", nargs='?', default="")
|
|
parser.add_argument("--kernel-image-name")
|
|
parser.add_argument("--kernel-image-type", nargs='?', default="")
|
|
parser.add_argument("--boot-method")
|
|
parser.add_argument("--lava-tags", nargs='?', default="")
|
|
parser.add_argument("--jwt-file", type=pathlib.Path)
|
|
parser.add_argument("--validate-only", action='store_true')
|
|
parser.add_argument("--dump-yaml", action='store_true')
|
|
parser.add_argument("--visibility-group")
|
|
parser.add_argument("--mesa-job-name")
|
|
|
|
return parser
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
|
|
# GitLab runner -> GitLab primary -> user, safe to say we don't need any
|
|
# more buffering
|
|
sys.stdout.reconfigure(line_buffering=True)
|
|
sys.stderr.reconfigure(line_buffering=True)
|
|
|
|
parser = create_parser()
|
|
|
|
parser.set_defaults(func=main)
|
|
args = parser.parse_args()
|
|
treat_mesa_job_name(args)
|
|
args.func(args)
|