工程经验 - 算法的工程化与插件化

最近在做一个开源项目 - synthetic-data-generator这是个基于 pluggy ,将算法工程化、 插件化的项目。本文将介绍在做此项目时,了解到的插件技术,和大家聊聊 pluggy,聊聊 python 中的插件系统设计。

总体来说: synthetic-data-generator = 内外部插件系统设计 + 自动扫描路径注册插件 + 实例化获取插件 。

一、背景

synthetic-data-generator 这个项目的功能是表格数据合成,即给一个原始数据表,能产出一个保真度挺高的合成数据表。在数据合成的领域里,有基于统计学的方法,例如,高斯 Copula、马尔科夫链等。也有机器学习的方法,例如 GAN 一类的生成对抗网络,CTGAN、OCT-GAN,还有 VAE 一类变分自编码器的,例如 TVAE。

Tips: 为什么需要这么多种算法,这不是在做重复工作么?

因为数据合成领域每种方法都有各自的优势,效果(保真度、隐私保护、性能)不一。目前还没有一个大而统一的方案可以每个方面都做得很好。

工程领域,我们知道在做一个业务系统时,我们会根据业务建模,遵循设计原则,应用某些设计模式,为此设计出清晰的架构,使得业务系统有着良好的维护性和健壮性。但在科研领域,往往不具备这些考虑,考虑地更多的可能是复现论文实验、跑通样例、尽早中刊。因此,做算法科研的,提供出来的东西健壮性和可维护性是较差的,往往跑这种数据报错,跑那种数据又报错。

就算法到落地应用的健壮性和可维护性,有以下两个普遍存在的问题:

  • 如何把这些算法实现有条不紊地安排在一个系统模块内规范实现?

  • 如何把这些算法需要做的特征工程做到系统化管理起来?

工程上, synthetic-data-generator 解决的就是以上两个问题,核心处理流程如下:

sequenceDiagram participant User participant DataConnector participant DataLoader participant Metadata participant Synthesizer participant DataProcessor participant Model participant Evaluator User->>DataConnector: create_connector() DataConnector-->>DataLoader: connector User->>DataLoader: load_data() DataLoader->>Metadata: from_dataloader() User->>Synthesizer: fit(metadata) Synthesizer->>DataProcessor: convert(data) DataProcessor-->>Synthesizer: processed_data Synthesizer->>Model: fit(metadata, processed_data) Model-->>Synthesizer: trained_model User->>Synthesizer: sample(n_samples) Synthesizer->>Model: generate() Model-->>Synthesizer: synthetic_data Synthesizer->>DataProcessor: reverse_convert(synthetic_data) DataProcessor-->>Synthesizer: restored_data Synthesizer-->>User: restored_data User->>Evaluator: evaluate(real_data, restored_data) Evaluator-->>User: evaluation_results

二、最简单的实例

在 pluggy 的定义中,有如下关键点:

  • 定义插件组标记:定义接口标记 hookspec 和接口实现标记 hookimpl,它们都属于一个插件组

  • 定义插件声明接口:通过接口标记 hookspec 定义接口

  • 定义插件接口实现:通过接口实现标记 hookimpl 定义接口实现

如下, 定义了一个插件组,名称为 myproject。整了一个类 MySpec 定义插件接口声明,以及两个插件 Plugin_1 和 Plugin_2 做了插件接口的实现,可以发现其上通过装饰器 @hookspec 和 @hookimpl 绑定在一起的。

在 python 中,对象可以作为装饰器使用。当对象用作装饰器时,会自动调用对象内部的 call 方法,这里就是 pluggy 做了相关的设计实现了。让我们不通过类的继承或实现来做插件,而是通过装饰器,这是 pluggy 的核心作用之一。

在 pluggy 中,插件组可以有多个,例如 myproject1、myproject2... 做插件组的实现时,只需引用对应插件组的 hookspec 和 hookimpl ,再实现对应的插件内容即可。

除了上述说的插件定义和实现工作,pluggy 还提供了 PluginManager 获取对应的插件组,从而可以批量执行插件的功能。

基于 pluggy 实现的一个最简单的插件系统如下,只需一个 python 文件,直接 copy ,直接运行。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pluggy

hookspec = pluggy.HookspecMarker("myproject")  # hook 标签 用于标记hook
hookimpl = pluggy.HookimplMarker("myproject")  # hook 实现标签 用于标记hook的一个或多个实现


class MySpec(object):
    """hook 集合"""

    @hookspec
    def myhook(self, arg1, arg2):
        pass

    @hookspec
    def my_hook_func1(self, arg1, arg2):
        pass

    @hookspec
    def my_hook_func2(self, arg1, arg2):
        pass

# 插件类
class Plugin_1(object):
    """hook实现类1"""

    @hookimpl
    def myhook(self, arg1, arg2):
        print("Plugin_1.myhook called")
        return arg1 + arg2

    @hookimpl
    def my_hook_func2(self, arg1, arg2):
        print("Plugin_1.my_hook_func2 called, args:", arg1, arg2)

    def my_hook_func3(self, arg1, arg2):
        print("Plugin_1.my_hook_func3 called, args:", arg1, arg2)


class Plugin_2(object):
    """hook实现类2"""

    @hookimpl
    def myhook(self, arg1, arg2):
        print("Plugin_2.myhook called")
        return arg1 - arg2

    @hookimpl
    def my_hook_func2(self, arg1, arg2):
        print("Plugin_2.my_hook_func2, args:", arg1, arg2)

# 初始化 PluginManager
pm = pluggy.PluginManager("myproject")

# 登记hook集合(hook函数声明)
pm.add_hookspecs(MySpec)

# 注册插件(hook函数实现)
pm.register(Plugin_1())
pm.register(Plugin_2())

# 调用自定义hook
results = pm.hook.myhook(arg1=1, arg2=2) # 调用两个插件类中的同名hook函数 # 后注册的插件中的函数会先被调用
print(results) # 输出 [-1, 3]

results = pm.hook.my_hook_func1(arg1="name", arg2="shouke")
print(results)

pm.hook.my_hook_func2(arg1="addr", arg2="sz")

三、实现内外部插件系统的实例

完整代码:https://github.com/jalr4ever/pluggy-demo-reader,具体运行见仓库 README。

目录结构:

.
├── README.md
├── extension
│   └── plugins
│       └── pgsql
│           ├── pyproject.toml
│           └── reader_pgsql.py
├── pyproject.toml
└── reader
    ├── __init__.py
    ├── hookspecs.py
    ├── plugins
    │   ├── __init__.py
    │   └── mysql
    │       ├── __init__.py
    │       └── reader.py
    └── runner.py

此例实现了一个多数据源读取器的插件系统 reader,每个插件都是个读插件,能力是读取不同的数据源的数据。和上一个实例不同的是,这个 reader_pgsql 是外部插件。 reader 打包的时候不会将 extension 包打进去。

看目录树,也即:reader = hookspecs (插件声明) + plugins(内部插件)。

外部插件在写的时候,需要引用 reader 依赖(就是其 wheel 包,毕竟需要获取插件声明),然后通过在 pyproject.toml 定义的 entrypoint 即可注册到 reader 系统中。最后,外部插件写完,也通过 pip install -e 外部插件,再次运行 reader 系统,即可获取到注册进来的外部插件。

这样做的好处是:外部插件在一些临时场景或者特例场景下,可以单独集成,不会影响到插件系统本身的维护与发布。

四、synthetic-data-generator 的插件实现

在最开始,我们已经说过: synthetic-data-generator = 内外部插件系统设计 + 自动扫描路径注册插件 + 实例化获取插件 。

与章节三不同的是,synthetic-data-generator 除了支持内外部插件,还额外设计了插件的 Manager 体系,用于做 自动扫描路径注册插件 + 实例化获取插件。

每个模块都有一个 extension.py 和 manager.py:

  • 声明工作 - 在 extension.py 定义插件组的声明,hookimpl 和 hookspec,但 hookspec 定义的接口只有一个,注册接口。

  • 类加载工作 - 在 manager.py 定义插件的插件组名称,继承了一个统一的 manager 基类,其定义以及加载「本地插件」和「外部插件」的方式,以及将加载后类载入到 pluggy 的 PluginManager。本地插件就是自己可以通过 Import 获取到的包,外部插件则是通过构建配置 toml 文件中定义的路径定义的。

插件具体使用的时候,是在功能类上获实例需要的模块 manager,从而获取插件的实例:

  • 实例化工作 - manager 实例化的实际会加载「本地插件」和「外部插件」,Manager 调用 init 插件方法,从而实例化所有插件

应用流程举例

  • Metadata 中会实例化 InspectorManager

  • Synthesizer 中会实例化 DataProcessorManager

  • ...

自动扫描路径注册插件:

设计了 Manager,其中让各模块 Manager 同包路径,利用 importlib 注册到 pluggy 中。其中做了 pm.register, 只有这个注册了,pluggy 才能获取到插件,将实现了 hookimpl 装饰器的插件注册进去

def _load_dir(self, module):
    """
    Import all python files in a submodule.
    """
    modules = glob.glob(join(dirname(module.__file__), "*.py"))
    sub_packages = (
        basename(f)[:-3] for f in modules if isfile(f) and not f.endswith("__init__.py")
    )
    packages = (str(module.__package__) + "." + i for i in sub_packages)
    for p in packages:
        self.pm.register(importlib.import_module(p))

实例化获取插件:

设计了 Manager,这个基类还提供 init 方法,用于传入类型,获取对应的插件实例,synthetic-data-generator 目前的调用是获取实例,依次调用的。synthetic-data-generator 提供了类名注册列表,只有写到注册列表的类才会被实例化:

def init(self, c, **kwargs: dict[str, Any]):
    """
    Init a new subclass of self.register_type.

    Raises:
        NotFoundError: if cls_name is not registered
        InitializationError: if failed to initialize
    """
    if isinstance(c, self.register_type):
        return c

    if isinstance(c, type):
        cls_type = c
    else:
        c = self._normalize_name(c)

        if not c in self.registed_cls:
            raise NotFoundError
        cls_type = self.registed_cls[c]
    try:
        instance = cls_type(**kwargs)
        if not isinstance(instance, self.register_type):
            raise InitializationError(f"{c} is not a subclass of {self.register_type}.")
        return instance
    except Exception as e:
        raise InitializationError(e)


0