API Reference

class firexapp.application.ExitSignalHandler(app)[source]

Bases: object

first_warning = '\nExiting due to signal %s'
last_warning = "\nFINE! We'll stop. But you might have leaked a celery instance or a broker instance."
second_warning = '\nWe know! Have a little patience for crying out loud!'
class firexapp.application.FireXBaseApp(submit_app=None, info_app=None)[source]

Bases: object

create_arg_parser(description=None) → argparse.ArgumentParser[source]
main_error_exit_handler(reason=None)[source]
run(sys_argv=None)[source]
firexapp.application.get_app_task(task_short_name: str, all_tasks=None)[source]
firexapp.application.get_app_tasks(tasks, all_tasks=None)[source]
firexapp.application.import_microservices(plugins_files=None, imports: tuple = None)][source]
firexapp.application.main()[source]
class firexapp.celery_manager.CeleryManager(plugins=None, logs_dir=None, worker_log_level='debug', cap_concurrency=None, app='firexapp.engine', celery_bin_dir='', env=None, broker=None)[source]

Bases: object

static cap_cpu_count(count, cap_concurrency)[source]
property celery_bin
celery_bin_name = 'celery'
property celery_logs_dir
property celery_pids_dir
extract_errors_from_celery_logs(celery_log_file, max_errors=20)[source]
find_all_procs()[source]
static find_procs(pid_file)[source]
static get_celery_logs_dir(logs_dir)[source]
static get_celery_pids_dir(logs_dir)[source]
classmethod get_pid(logs_dir, workername, hostname='b25de1dcc3f3')[source]
classmethod get_pid_file(logs_dir, workername, hostname='b25de1dcc3f3')[source]
classmethod get_pid_from_file(pid_file)[source]
static get_plugins_env(plugins)[source]
static get_worker_and_host(workername, hostname)[source]
classmethod get_worker_log_file(logs_dir, worker_and_host)[source]
static get_worker_logs_dir(logs_dir)[source]
classmethod get_worker_pids(logs_dir, hostname, workernames)[source]
kill_all_forked(pid_file)[source]
classmethod log(msg, header=None, level=10)[source]
shutdown(timeout=60)[source]
start(workername, queues=None, wait=True, timeout=900, concurrency=None, worker_log_level=None, app=None, cap_concurrency=None, cwd=None, soft_time_limit=None)[source]
classmethod terminate(pid, timeout=60)[source]
update_env(env)[source]
wait_for_shutdown(timeout=15)[source]
wait_until_active(pid_file, stdout_file, workername, timeout=900)[source]
property workers_logs_dir
exception firexapp.celery_manager.CeleryWorkerStartFailed[source]

Bases: Exception

firexapp.common.delimit2list(str_to_split, delimiters=',', ';', '|', ' ')][source]
firexapp.common.find(keys, input_dict)[source]
firexapp.common.find_procs(name, cmdline_regex=None, cmdline_contains=None)[source]
firexapp.common.get_available_port(so_reuseport=True)[source]
firexapp.common.poll_until_dir_empty(dir_path, timeout=15)[source]
firexapp.common.poll_until_existing_file_not_empty(file_path, timeout=10)[source]
firexapp.common.poll_until_file_exist(file_path, timeout=10)[source]
firexapp.common.poll_until_file_not_empty(file_path, timeout=10)[source]
firexapp.common.poll_until_path_exist(path, timeout=10)[source]
firexapp.common.proc_matches(proc_info, pname, cmdline_regex, cmdline_contains)[source]
firexapp.common.qualify_firex_bin(bin_name)[source]
firexapp.common.select_env_vars(env_names)[source]
firexapp.common.silent_mkdir(path, mode=511, exist_ok=True)[source]
firexapp.common.wait_until(predicate, timeout, sleep_for, *args, **kwargs)[source]
firexapp.common.wait_until_pid_not_exist(pid, timeout=7, sleep_for=1)[source]
firexapp.discovery.discover_package_modules(current_path, root_path=None)][source]
firexapp.discovery.find_firex_task_bundles()][source]
class firexapp.fileregistry.FileRegistry(from_file=None)[source]

Bases: object

classmethod destroy()[source]
dump_to_file(path)[source]
get_file(key, uid_or_logsdir)[source]
get_relative_path(key)[source]
static read_from_file(path)[source]
register_file(key, relative_path)[source]
static resolve_path(uid_or_logsdir, relative_path)[source]
exception firexapp.fileregistry.KeyAlreadyRegistered[source]

Bases: Exception

exception firexapp.fileregistry.KeyNotRegistered[source]

Bases: Exception

class firexapp.fileregistry.Singleton[source]

Bases: type

class firexapp.info.InfoBaseApp[source]

Bases: object

create_info_sub_parser(sub_parser)[source]
create_list_sub_parser(sub_parser)[source]
classmethod parse_task_docstring(task)[source]
static print_argument_used(plugins: str)[source]
static print_available_microservices(plugins: str)[source]
print_details(entity, plugins, all_tasks=None)[source]
classmethod print_task_details(task)[source]
run_info(args)[source]
run_list(args)[source]
firexapp.info.get_argument_use(all_tasks) → dict[source]
class firexapp.plugins.CommaDelimitedListAction(option_strings, dest, nargs=None, **kwargs)[source]

Bases: argparse.Action

firexapp.plugins.cdl2list(external_files)[source]
firexapp.plugins.create_replacement_task(original, name_postfix, sigs)[source]
firexapp.plugins.find_plugin_file(file_path)[source]
firexapp.plugins.get_active_plugins()[source]
firexapp.plugins.get_plugin_module_list(external_files=None)[source]
firexapp.plugins.get_plugin_modules(external_files)[source]
firexapp.plugins.get_short_name(long_name: str) → str[source]
firexapp.plugins.identify_duplicate_tasks(all_tasks, priority_modules: list)[]][source]

Returns a list of substitution. Each substitution is a list of microservices. The last will be the ‘dominant’ one. It will be the one used.

firexapp.plugins.import_plugin_module(module_name, external_files)[source]
firexapp.plugins.load_plugin_modules(external_files=None)[source]
firexapp.plugins.merge_plugins(plugins_list1, plugins_list2='')][source]

Merge two comma delimited list of plugins into a single list. Right-handed most significant plugin

firexapp.plugins.set_plugins_env(external_files)[source]
exception firexapp.submit.arguments.ChainArgException[source]

Bases: Exception

class firexapp.submit.arguments.InputConverter[source]

Bases: object

This class uses a singleton object design to store converters which parse the cli arguments. Converter functions are stored into the singleton InputConverter object by adding the @register decorator to the top of each desired function.

classmethod convert(pre_load=None, **kwargs) → dict[source]

Activates conversion. kwargs provided are passed to any registered converter. This function should be called twice, and only twice. Once with initially loaded converters, and then with the secondary ones.

Parameters

pre_load – Used for testing. preload is defaulted to None and will auto populate

classmethod instance() → firexkit.argument_conversion.ConverterRegister[source]

Used for unit testing only

pre_load_was_run = False
classmethod register(*args)[source]

Registers a callable object to be run during conversion. The callable should take in kwargs, and return a dict with any changes to the input arguments, or None if no changes are necessary.

Example single argument converter

@InputConverter.register @SingleArgDecorator(‘something’) def convert_something(arg_value):

arg_value = arg_value.upper() return arg_value

Optionally, dependencies can defined at registration

@InputConverter.register(‘other_converter’, ‘and_another_converter’) @SingleArgDecorator(‘something’) def convert_something(arg_value):

arg_value = arg_value.upper() return arg_value

Conversion occurs on two occasions, before microservices are loaded, or after. You can explicitly mark a converter to run pre-loading or post-loading of the ALL microservices by passing True (pre) or False (post) during registration. This design is used in the spirit of failing fast, providing early failure of runs before the bulk of microservices are imported. If bool is not provided, it will register to run pre unless loading has already occurred.

@InputConverter.register(‘other_converter’, False) @SingleArgDecorator(‘something’) def convert_something(arg_value):

… return arg_value

When a conversion fails the given function can simply call raise to instruct the user how to correct their inputs.

firexapp.submit.arguments.auto_load_pydev_debugging_plugin(kwargs)[source]
firexapp.submit.arguments.convert_booleans(kwargs)[source]

Converts standard true/false/none values to bools and None

firexapp.submit.arguments.find_unused_arguments(chain_args: {}, ignore_list: ], all_tasks: ])[source]

Function to detect any arguments that are not explicitly consumed by any microservice.

Note

This should be run AFTER all microservices have been loaded.

Parameters
  • chain_args (dict) – The dictionary of chain args to check

  • ignore_list (list) – A list of exception arguments that are acceptable. This usually includes application args.

  • all_tasks – A list of all microservices. Usually app.tasks

Returns

A dictionary of un-applicable arguments

firexapp.submit.arguments.get_chain_args(other_args: ])[source]

This function converts a flat list of –key value pairs into a dictionary

firexapp.submit.arguments.whitelist_arguments(argument_list: Union[str, list])[source]

Function for adding argument keys to the global argument whitelist. Used during validation of input arguments

:param argument_list:List of argument keys to whitelist. :type argument_list: list

class firexapp.submit.console.DistlibWarningsFilter(name='')[source]

Bases: logging.Filter

filter(record)[source]

Determine if the specified record is to be logged.

Is the specified record to be logged? Returns 0 for no, nonzero for yes. If deemed appropriate, the record may be modified in-place.

class firexapp.submit.console.FireXColoredConsoleFormatter(*args, **kwargs)[source]

Bases: colorlog.colorlog.TTYColoredFormatter

format(record)[source]

Format a message from a record object.

class firexapp.submit.console.RetryFilter(name='')[source]

Bases: logging.Filter

filter(record)[source]

Determine if the specified record is to be logged.

Is the specified record to be logged? Returns 0 for no, nonzero for yes. If deemed appropriate, the record may be modified in-place.

firexapp.submit.console.add_filter_to_console(log_filter)[source]
firexapp.submit.console.set_console_log_level(log_level)[source]
firexapp.submit.console.setup_console_logging(module=None, stdout_logging_level=20, console_logging_formatter='%(green)s[%(asctime)s]%(reset)s[%(hostname)s] %(log_color)s%(message)s', console_datefmt='%H:%M:%S', stderr_logging_level=40, module_logger_logging_level=None)[source]
class firexapp.submit.reporting.ReportGenerator[source]

Bases: abc.ABC

abstract add_entry(key_name, value, priority, formatters, **extra)[source]
filter_formatters(all_formatters)[source]
formatters = ()
abstract post_run_report(root_id=None, **kwargs)[source]

This could runs in the context of __main__ if –sync, other in the context of celery. So the instance cannot be assumed be the same as in pre_run_report()

static pre_run_report(**kwarg)[source]

This runs in the context of __main__

class firexapp.submit.reporting.ReportersRegistry[source]

Bases: object

classmethod get_generators()[source]
classmethod post_run_report(results, kwargs)[source]
classmethod pre_run_report(kwargs)[source]
firexapp.submit.reporting.recurse_results_tree(results)[source]
firexapp.submit.reporting.report(key_name=None, priority=- 1, **formatters)[source]

Use this decorator to indicate what returns to include in the report and how to format it

class firexapp.submit.submit.SubmitBaseApp(submission_tmp_file=None)[source]

Bases: object

DEFAULT_MICROSERVICE = None
PRIMARY_WORKER_NAME = 'mc'
SUBMISSION_LOGGING_FORMATTER = '[%(asctime)s %(levelname)s] %(message)s'
check_for_failures(chain_result, chain_args)[source]
convert_chain_args(chain_args) → dict[source]
copy_submission_log()[source]
create_submit_parser(sub_parser)[source]
static error_banner(err_msg, banner_title='ERROR', logf=<bound method Logger.error of <Logger firexapp.submit.submit (WARNING)>>)[source]
static get_all_failures(chain_result)[source]
graceful_exit_on_failure(failure_caption: str)[source]
init_file_logging()[source]
log_preamble()[source]

Overridable method to allow a firex application to log on startup

main_error_exit_handler(chain_details=None, reason=None)[source]
process_other_chain_args(args, other_args) → {}[source]
run_submit(args, others)[source]
self_destruct(chain_details=None, reason=None)[source]
set_broker_in_app()[source]
start_broker(args)[source]
start_celery(args, plugins)[source]
start_engine(args, chain_args, uid) → {}[source]
start_tracking_services(args, **chain_args) → {}[source]
submit(args, others)[source]
classmethod validate_argument_applicability(chain_args, args, all_tasks)[source]
wait_tracking_services_pred(service_predicate, description, timeout) → None[source]
wait_tracking_services_release_console_ready(timeout=5) → None[source]
wait_tracking_services_task_ready(timeout=5) → None[source]
firexapp.submit.submit.add_uid_to_conf(conf=None, **kwargs)[source]
firexapp.submit.submit.get_firex_id_from_output(cmd_output: str) → str[source]
firexapp.submit.submit.get_log_dir_from_output(cmd_output: str) → str[source]
class firexapp.submit.tracking_service.TrackingService[source]

Bases: abc.ABC

extra_cli_arguments(arg_parser)[source]
ready_for_tasks(**kwargs) → bool[source]
ready_release_console(**kwargs) → bool[source]
abstract start(args, **kwargs) → {}[source]
firexapp.submit.tracking_service.get_service_name(service: firexapp.submit.tracking_service.TrackingService) → str[source]
firexapp.submit.tracking_service.get_tracking_services()[source]
firexapp.submit.tracking_service.prune_duplicate_module_entry_points(entry_points)[source]
class firexapp.submit.uid.Uid(identifier=None)[source]

Bases: object

property base_logging_dir
create_debug_dir()[source]
create_logs_dir()[source]
property debug_dir
debug_dirname = 'debug'
property logs_dir
class firexapp.tasks.core_tasks.BrokerShutdown(parent, **kwargs)[source]

Bases: celery.bootsteps.celery.bootsteps.StartStopStep

This celery shutdown step will cleanup redis

label = 'Broker'
name = 'firexapp.tasks.core_tasks.BrokerShutdown'
shutdown(parent)[source]
firexapp.tasks.core_tasks.get_configured_root_task()[source]
firexapp.tasks.core_tasks.handle_firex_root_completion(sender, task, task_id, args, kwargs, **do_not_care)[source]
class firexapp.testing.config_base.FlowTestConfiguration[source]

Bases: object

abstract assert_expected_firex_output(cmd_output, cmd_err)[source]
abstract assert_expected_return_code(ret_value)[source]
abstract initial_firex_options() → list[source]
property name
class firexapp.testing.config_base.InterceptFlowTestConfiguration[source]

Bases: firexapp.testing.config_base.FlowTestConfiguration

assert_expected_firex_output(cmd_output, cmd_err)[source]
abstract assert_expected_options(captured_kwargs)[source]
assert_expected_return_code(ret_value)[source]
abstract intercept_service() → str[source]

Name of the microservice that will be mocked into the validator capturing the options that get compared by assertExpectedOptions()

firexapp.testing.config_base.assert_is_bad_run(ret_value)[source]
firexapp.testing.config_base.assert_is_good_run(ret_value)[source]
firexapp.testing.config_base.discover_tests(tests, config_filter='') → list[source]
firexapp.testing.config_base.import_test_configs(path)][source]
firexapp.testing.config_base.skip_test(cls)[source]
class firexapp.testing.config_interpreter.ConfigInterpreter[source]

Bases: object

cleanup_after_timeout(std_out, std_err)[source]
collect_plugins(flow_test_config)][source]
create_cmd(flow_test_config)][source]
static create_mock_file(results_folder, results_file, test_name, intercept_microservice)[source]
document_viewer(file_path: str) → str[source]
execution_directory = None
get_exe(flow_test_config)][source]
static get_intercept_results_file(flow_test_config)[source]
static get_test_name(flow_test_config)[source]
static is_instance_of_intercept(test_config: firexapp.testing.config_base.FlowTestConfiguration)[source]
static is_submit_command(test_config: firexapp.testing.config_base.FlowTestConfiguration)[source]
on_test_exit(std_out, std_err)[source]
run_executable(cmd, flow_test_config)[source]
run_integration_test(flow_test_config, results_folder)[source]
firexapp.testing.coverage_plugin.find_in_stack(file_to_find) → bool[source]
firexapp.testing.coverage_plugin.is_celery() → bool[source]
firexapp.testing.coverage_plugin.is_running_under_coverage() → bool[source]
firexapp.testing.coverage_plugin.restart_celery_under_coverage()[source]
class firexapp.testing.test_infra.FlowTestInfra(methodName='runTest')[source]

Bases: unittest.case.TestCase

config_interpreter = <firexapp.testing.config_interpreter.ConfigInterpreter object>
failures = 0
max_acceptable_failures = None
classmethod populate_tests()[source]
results_dir = None
setUp()[source]

Hook method for setting up the test fixture before exercising it.

tearDown()[source]

Hook method for deconstructing the test fixture after testing it.

test_configs = []
firexapp.testing.test_infra.default_main()[source]
firexapp.testing.test_infra.main(default_results_dir, default_test_dir)[source]