Source code for opsparser._manager._BaseHandler

import re
from abc import ABC, abstractmethod, ABCMeta
from typing import Any, Union
import inspect
import functools

class SingletonMeta(ABCMeta):
    """
    A metaclass that implements the Singleton pattern.
    Ensures only one instance of a class is created.
    """
    _instances = {}

    def __call__(cls, *args, **kwargs):
        """
        Override the __call__ method to implement Singleton behavior.
        Returns the existing instance if it exists, otherwise creates a new one.
        """
        if cls not in cls._instances:
            instance = super().__call__(*args, **kwargs)
            cls._instances[cls] = instance
        return cls._instances[cls]
    
    # ------------------------------------------------------------------
    # Intercept attribute access on the *class* itself so that instance-level
    # methods/properties transparently operate on the singleton instance.
    # ------------------------------------------------------------------
    def __getattribute__(cls, name):
        # Try to fetch the attribute directly from the class first.
        try:
            attr = super().__getattribute__(name)
        except AttributeError:
            # If the class doesn’t have the attribute, delegate the lookup
            # to the singleton instance (this preserves the old __getattr__
            # semantics).
            instance = cls()
            return getattr(instance, name)

        if name.startswith("__") and name.endswith("__"):
            return attr

        # Access the shared _instances dict directly via super to avoid
        # triggering our own __getattribute__ again (which would recurse).
        instances = super().__getattribute__("_instances")

        # If the singleton instance hasn't been created yet, simply return
        # the attribute unmodified so that normal instantiation flow is not
        # disrupted (the instance may require arguments).
        if cls not in instances:
            return attr

        # From here on, we are guaranteed that the singleton instance already
        # exists, so we can safely delegate or wrap attributes.

        # Retrieve the raw descriptor stored in the class dictionary (if any)
        raw_attr = cls.__dict__.get(name, None)

        # Do not wrap classmethod / staticmethod – they already work at the
        # class level as intended.
        if isinstance(raw_attr, (staticmethod, classmethod)):
            return attr

        # For ``property`` descriptors, return the computed value from the
        # singleton instance instead of the descriptor object itself.
        if isinstance(raw_attr, property):
            instance = instances[cls]
            return getattr(instance, name)

        # If the attribute is a plain function (i.e. an *instance* method),
        # return a wrapper that binds it to the singleton instance so that
        # callers can invoke it directly on the class: ``MyClass.foo(...)``.
        if inspect.isfunction(attr):
            @functools.wraps(attr)
            def _singleton_bound(*args, **kwargs):
                instance = instances[cls]
                return getattr(instance, name)(*args, **kwargs)

            return _singleton_bound

        # Otherwise, return the attribute unmodified.
        return attr

    # def __getattr__(cls, name):
    #     """Fallback for *missing* attributes.

    #     If the singleton instance already exists, delegate attribute lookup to
    #     it.  Otherwise raise :class:`AttributeError` without creating a new
    #     instance, so that normal instantiation (which may require
    #     constructor arguments) is not disrupted.
    #     """

    #     instances = super().__getattribute__("_instances")
    #     if cls in instances:
    #         instance = instances[cls]
    #         return getattr(instance, name)

    #     raise AttributeError(
    #         f"Neither the class '{cls.__name__}' nor its singleton instance has an attribute '{name}'"
    #     )

[docs] class BaseHandler(ABC, metaclass=SingletonMeta): # --------------------------------------------------------------------- # Abstract Property - MUST be implemented by subclasses # --------------------------------------------------------------------- @property @abstractmethod def _COMMAND_RULES(self) -> dict[str, dict[str, Any]]: """ A set of parsing rules for the most commonly used OpenSeesPy commands. Each entry describes how positional arguments should be mapped and what optional *flag*-style arguments exist. A trailing ``*`` on the key name indicates that the value can contain an arbitrary number of tokens which will be returned as a :class:`list`. Example(OpenSeesPy Commands): node(nodeTag, *crds, '-ndf', ndf, '-mass', *mass, '-disp', ...) mass(nodeTag, *massValues) element(eleType, tag, *eleNodes, *eleArgs) uniaxialMaterial(matType, matTag, *matArgs) timeSeries(typeName, tag, *args) load(tag, *args) Example(rule set dict): { "node": { "positional": ["tag", "coords*"], "options": { "-ndf": "ndf", "-mass": "mass*", "-disp": "disp*", "-vel": "vel*", "-accel": "accel*", }, }, "mass": { "positional": ["tag", "mass*"], }, "element": { "positional": ["eleType", "tag", "args*"], }, "uniaxialMaterial": { "positional": ["matType", "matTag", "args*"], }, "timeSeries": { "positional": ["typeName", "tag", "args*"], }, "load": { "positional": ["tag", "args*"], }, } """ raise NotImplementedError # --------------------------------------------------------------------- # Abstract API - MUST be implemented by subclasses # --------------------------------------------------------------------- @staticmethod @abstractmethod def handles() -> list[str]: """Return a list of function names this handler can process.""" raise NotImplementedError
[docs] @staticmethod @abstractmethod def handles() -> list[str]: """返回该处理器支持的命令列表(如 element / uniaxialMaterial / nDMaterial)""" raise NotImplementedError
[docs] @abstractmethod def handle(self, func_name: str, arg_map: dict[str, Any]): """Process the function *func_name* using the already parsed *arg_map*.""" raise NotImplementedError
[docs] @abstractmethod def clear(self): """Reset internal data maintained by a concrete handler.""" raise NotImplementedError
# ------------------------------------------------------------------ # Generic helpers shared by all handlers # ------------------------------------------------------------------ @staticmethod def _extract_args_by_str(lst: list[Any], target_keys: Union[str, list, tuple, set]) -> list[Any]: """Return the values *following* any of *target_keys* until the next string token is encountered. Parameters ---------- lst : list[Any] The full argument list. target_keys : Union[str, list, tuple, set] A single key or an iterable of keys that should be searched for. Returns ------- list[Any] List of non-string values following any target key until the next string. Notes ----- - If lst is None or empty, returns an empty list - If target_keys is None, treats it as an empty collection - If no target key is found, returns an empty list """ if lst is None or len(lst) == 0: return [] result: list[Any] = [] found = False if target_keys is None: target_keys = set() elif isinstance(target_keys, str): target_keys = {target_keys} else: try: target_keys = set(target_keys) except (TypeError, ValueError): target_keys = {target_keys} for item in lst: if found: if isinstance(item, str): break result.append(item) elif not isinstance(item, list) and item in target_keys: found = True return result @staticmethod def _parse_rule_based_command(rule: dict[str, Any], *args: Any, **kwargs:Any) -> dict[str, Any]: """Parse command arguments according to a specific rule.""" result: dict[str, Any] = {} arg_list: list[Any] = [x for x in list(args) if x != {} and x is not None] # Parse positional arguments result.update(BaseHandler._parse_positional_args(rule, arg_list)) # 记录解析选项标志前的args orig_args = result.get("args", []) # Parse option flags option_result = BaseHandler._parse_option_flags(rule, arg_list) result.update(option_result) # 如果有解析出选项标志, 并且存在args字段, 则清理args中的选项标志及其值 if option_result and "args" in result and orig_args: # 获取所有选项标志 option_flags = rule.get("options", {}).keys() # 清理args中的选项标志及其值 cleaned_args = [] skip_count = 0 for i, item in enumerate(orig_args): if skip_count > 0: skip_count -= 1 continue # 如果当前项是选项标志, 跳过它及其值 if isinstance(item,str) and item in option_flags: # 获取这个选项后面的值的数量 values = BaseHandler._extract_args_by_str(orig_args[i:], item) skip_count = len(values) continue cleaned_args.append(item) # 更新args字段 if cleaned_args: result["args"] = cleaned_args else: # 如果清理后args为空, 则移除args字段 result.pop("args", None) # kwargs take precedence over everything parsed from *args* result.update(kwargs) return result
[docs] @staticmethod def get_name_and_count(origin_name: str) -> tuple[str, int]: """Get name and count from a name with count suffix.""" if '*' in origin_name: match = re.match(r"(.+?)\*(\d+)$", origin_name) if match: name, count = match.groups() return name.rstrip('?'), int(count) elif origin_name.endswith("*"): name = origin_name.rstrip('*') return name.rstrip('?'), "all" return origin_name.rstrip('?'), 1
@staticmethod def _parse_positional_args(rule: dict[str, Any], arg_list: list[Any]) -> dict[str, Any]: """Parse positional arguments according to rule.""" result: dict[str, Any] = {} idx = 0 stop_idx = len(arg_list) for name in rule.get("positional", []): # 检查是否是带有数字限制的参数模式, 如 name*2 clean_name, count = BaseHandler.get_name_and_count(name) if isinstance(count, int): if idx+count > stop_idx: if '?' in name: count = 0 else: raise ValueError(f"Invalid ini value for positional argument {name}: {count =} must be less than {stop_idx-idx}") result[clean_name] = arg_list[idx] if count == 1 else None if count == 0 else arg_list[idx:idx + count] idx += count elif isinstance(count,str) and count == "all": # Consume tokens until next recognised option flag (if any) for flag in rule.get("options", {}): pure_flag = flag.split('*')[0].rstrip('?') if pure_flag in arg_list[idx:]: candidate = arg_list.index(pure_flag, idx) stop_idx = min(stop_idx, candidate) result[clean_name] = arg_list[idx:stop_idx] idx = stop_idx else: # Handle unknown count format raise ValueError(f"Invalid parameter format for {name}: {count =} must be int or 'all'") # Store unconsumed positional tokens under "args" if idx < len(arg_list): result.setdefault("args", []).extend(arg_list[idx:]) return result @staticmethod def _parse_option_flags(rule: dict[str, Any], arg_list: list[Any]) -> dict[str, Any]: """Parse option flags according to rule.""" result: dict[str, Any] = {} for flag, name in rule.get("options", {}).items(): pure_flag = flag.split('*')[0].rstrip('?') if pure_flag in arg_list: values = BaseHandler._extract_args_by_str(arg_list, pure_flag) idx = 0 stop_idx = len(values) if isinstance(name, str): name = [name] # 这些参数可能对应多个参数,需要分别处理 for subname in name: clean_name, count = BaseHandler.get_name_and_count(subname) if isinstance(count, int): if idx+count > stop_idx: if '?' in flag: count = 0 else: raise ValueError(f"Invalid ini value for optional argument {subname}: {count =} must be less than {stop_idx-idx}") result[clean_name] = values[idx] if count == 1 else None if count == 0 else values[idx:idx + count] idx += count elif isinstance(count, str) and count == "all": result[clean_name] = values[idx:stop_idx] if stop_idx > idx else values[idx] idx = stop_idx else: # Handle unknown count format raise ValueError(f"Invalid parameter format for {subname}: {count =} must be int or 'all'") return result # ------------------------------------------------------------------ # Universal command-line like argument parser # ------------------------------------------------------------------ def _parse(self, func_name: str, *args: Any, **kwargs: Any) -> dict[str, Any]: """ Parse OpenSeesPy command arguments (*args, **kwargs*) into a standardized dictionary. Rules: 1. The command name determines how positional arguments are assigned via :pyattr:`_COMMAND_RULES`. Names ending with "*" absorb all remaining arguments as a list. 2. Flag-style (e.g. '-mass') options are parsed with their associated values, following TCL conventions. 3. Any explicit **kwargs take precedence over parsed values. If the command is not found in :pyattr:`_COMMAND_RULES`, a default parser is used to extract flags and positional arguments, aiming for practical compatibility with typical OpenSeesPy usage. Returns: dict[str, Any]: Parsed argument mapping for the handler. """ kwargs = dict(kwargs or {}) # copy to avoid mutating caller data rule = self._COMMAND_RULES.get(func_name) # If rule has alternatives, means this command has many alternative rules, so need to check which rule to use alternative = rule.get("alternative", False) if alternative: # if not isinstance(rule, defaultdict): # warnings.warn(f"Rule for command {func_name} is not a defaultdict; unexpected behavior may occur.", UserWarning, stacklevel=2) specific_rule = rule[args[0]] # if not defaultdict and args[0] is not a key, will raise KeyError return self._parse_rule_based_command(specific_rule, *args, **kwargs) # Otherwise use rule-based parsing directly return self._parse_rule_based_command(rule, *args, **kwargs) # ----------------------------------------------------------------- # registration utility # ----------------------------------------------------------------- def _register(self, registry: dict[str, "BaseHandler"]) -> None: """ 注册该处理器可处理的类型到注册表中 Parameters ---------- registry : dict[str, BaseHandler] Manager维护的 {类型: 处理器} 映射 """ for arg_type in self.types(): registry[arg_type] = self def __repr__(self) -> str: """ Return string representation of this handler, including class name and list of commands it can handle """ # Get all commands that can be handled try: handled_commands = self.handles() cmd_str = ", ".join(handled_commands) except NotImplementedError: cmd_str = "Not implemented" except Exception as e: cmd_str = f"Failed to retrieve: {str(e)}" return f"{self.__class__.__name__} handles Commands: ({cmd_str})"
class SubBaseHandler(BaseHandler): @staticmethod @abstractmethod def types() -> list[str]: """Return a list of types this handler supports (e.g. eleType for element command or matType for material command)""" raise NotImplementedError def __repr__(self) -> str: """ Return string representation of subhandler, including class name, list of commands and supported types """ # Get base class representation base_repr = f"{self.__class__.__name__} handles Commands: ({", ".join(self.handles())})" # Get all supported types try: supported_types = self.types() types_str = ", ".join(supported_types) except NotImplementedError: types_str = "Not implemented" except Exception as e: types_str = f"Failed to retrieve: {str(e)}" # Extract class name and command part from base representation, then add type information return base_repr + f", with SubTypes: ({types_str})"