app.utility namespace¶
Submodules¶
app.utility.base_knowledge_svc module¶
- class app.utility.base_knowledge_svc.BaseKnowledgeService¶
Bases:
BaseService
app.utility.base_obfuscator module¶
app.utility.base_object module¶
- class app.utility.base_object.AppConfigGlobalVariableIdentifier¶
Bases:
object
- classmethod is_global_variable(variable)¶
- class app.utility.base_object.BaseObject¶
Bases:
BaseWorld
- property access¶
- static clean(d)¶
- property created¶
- property display¶
- display_schema = None¶
- static hash(s)¶
- classmethod load(dict_obj)¶
- load_schema = None¶
- match(criteria)¶
- replace_app_props(encoded_string)¶
- static retrieve(collection, unique)¶
- schema = None¶
- search_tags(value)¶
- update(field, value)¶
Updates the given field to the given value as long as the value is not None and the new value is different from the current value. Ignoring None prevents current property values from being overwritten to None if the given property is not intentionally passed back to be updated (example: Agent heartbeat)
- Parameters
field – object property to update
value – value to update to
app.utility.base_parser module¶
- class app.utility.base_parser.BaseParser(parser_info)¶
Bases:
object
- static broadcastip(blob)¶
- static email(blob)¶
Parse out email addresses :param blob: :return:
- static filename(blob)¶
Parse out filenames :param blob: :return:
- static ip(blob)¶
- static line(blob)¶
Split a blob by line :param blob: :return:
- static load_json(blob)¶
- static set_value(search, match, used_facts)¶
Determine the value of a source/target for a Relationship :param search: a fact property to look for; either a source or target fact :param match: a parsing match :param used_facts: a list of facts that were used in a command :return: either None, the value of a matched used_fact, or the parsing match
app.utility.base_planning_svc module¶
- class app.utility.base_planning_svc.BasePlanningService(global_variable_owners=None)¶
Bases:
BaseService
- add_global_variable_owner(global_variable_owner)¶
Adds a global variable owner to the internal registry.
These will be used for identification of global variables when performing variable-fact substitution.
- Args:
- global_variable_owner: An object that exposes an is_global_variable(…) method and accepts a string
containing a bare/unwrapped variable.
- async add_test_variants(links, agent, facts=(), rules=(), operation=None, trim_unset_variables=False, trim_missing_requirements=False)¶
Create a list of all possible links for a given set of templates
- Parameters
links –
agent –
facts –
rules –
operation –
trim_unset_variables –
trim_missing_requirements –
- Returns
updated list of links
- is_global_variable(variable)¶
- async obfuscate_commands(agent, obfuscator, links)¶
- re_index = re.compile('(?<=\\[filters\\().+?(?=\\)\\])')¶
- re_limited = re.compile('#{.*\\[*\\]}')¶
- re_trait = re.compile('(?<=\\{).+?(?=\\[)')¶
- re_variable = re.compile('#{(.*?)}', re.DOTALL)¶
- async static remove_completed_links(operation, agent, links)¶
Remove any links that have already been completed by the operation for the agent
- Parameters
operation –
links –
agent –
- Returns
updated list of links
- async static remove_links_above_visibility(links, operation)¶
- async static remove_links_with_unset_variables(links)¶
Remove any links that contain variables that have not been filled in.
- Parameters
links –
- Returns
updated list of links
- async trim_links(operation, links, agent)¶
- Trim links in supplied list. Where ‘trim’ entails:
adding all possible test variants
removing completed links (i.e. agent has already completed)
removing links that did not have template fact variables replaced by fact values
- Parameters
operation –
links –
agent –
- Returns
trimmed list of links
app.utility.base_service module¶
app.utility.base_world module¶
- class app.utility.base_world.AccessSchema(*, only: Union[Sequence[str], Set[str]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Dict = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: str = None)¶
Bases:
Schema
- opts: SchemaOpts = <marshmallow.schema.SchemaOpts object>¶
- class app.utility.base_world.BaseWorld¶
Bases:
object
A collection of base static functions for service & object module usage
- class Access(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶
Bases:
Enum
- APP = 0¶
- BLUE = 2¶
- HIDDEN = 3¶
- RED = 1¶
- class Privileges(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶
Bases:
Enum
- Elevated = 1¶
- User = 0¶
- TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'¶
- static apply_config(name, config)¶
- static check_requirement(params)¶
- static clear_config()¶
- static create_logger(name)¶
- static decode_bytes(s, strip_newlines=True)¶
- static encode_string(s)¶
- static generate_name(size=16)¶
- static generate_number(size=6)¶
- static get_config(prop=None, name=None)¶
- static get_current_timestamp(date_format='%Y-%m-%dT%H:%M:%SZ')¶
- static get_timestamp_from_string(datetime_str, date_format='%Y-%m-%dT%H:%M:%SZ')¶
- static is_base64(s)¶
- static is_uuid4(s)¶
- static jitter(fraction)¶
- async static load_module(module_type, module_info)¶
- static prepend_to_file(filename, line)¶
- re_base64 = re.compile('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', re.DOTALL)¶
- static set_config(name, prop, value)¶
- static strip_yml(path)¶
- class app.utility.base_world.PrivilegesSchema(*, only: Union[Sequence[str], Set[str]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Dict = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: str = None)¶
Bases:
Schema
- opts: SchemaOpts = <marshmallow.schema.SchemaOpts object>¶
app.utility.config_generator module¶
- app.utility.config_generator.ensure_local_config()¶
Checks if a local.yml config file exists. If not, generates a new local.yml file using secure random values.
- app.utility.config_generator.log_config_message(config_path)¶
- app.utility.config_generator.make_secure_config()¶
app.utility.file_decryptor module¶
- app.utility.file_decryptor.decrypt(filename, configuration, output_file=None, b64decode=False)¶
- app.utility.file_decryptor.get_encryptor(salt, key)¶
- app.utility.file_decryptor.read(filename, encryptor)¶
app.utility.payload_encoder module¶
This module contains helper functions for encoding and decoding payload files.
If AV is running on the server host, then it may sometimes flag, quarantine, or delete CALDERA payloads. To help prevent this, encoded payloads can be used to prevent AV from breaking the server. The convention expected by the server is that encoded payloads will be XOR’ed with the DEFAULT_KEY contained in the payload_encoder.py module.
Additionally, payload_encoder.py can be used from the command-line to add a new encoded payload.
`
python /path/to/payload_encoder.py input_file output_file
`
NOTE: In order for the server to detect the availability of an encoded payload, the payload file’s name must end in the .xored extension.
- app.utility.payload_encoder.xor_bytes(in_bytes, key=None)¶
- app.utility.payload_encoder.xor_file(input_file, output_file=None, key=None)¶