Reproducible Machine Learning|可復現的機器學習發展、架構及基本工具使用概述
目錄
- What is Reproducible Machine Learning?
- Reproducible Machine Learning的發展與架構
- Reproducible Machine Learning 基本工具
- 代碼版本管理—— Git
- 數據版本管理—— DVC
- 實驗過程監控—— Weight & Biases
- 特征管理—— Feature Store
- 實驗參數設置—— Hydra
- 容器技術—— Docker
- 總結
- 參考資料
在真實工業場景下,訓練好的 Machine learning 模型通常會在幾個月之后出現精度變差的情況(由于新的數據與模型產生交互,導致數據出現異常值或者數據分布改變等原因導致),行業內稱之為Concept drift.
Your model’s accuracy will be at its best until you start using it. It then deteriorates as the world it was trained to predict changes. This phenomenon is called concept drift, and while it’s been heavily studied in academia for the past two decades, it’s still often ignored in industry best practices.
這其實是 Machine Learning 領域長期以來欠下的“技術債”。事實上,當前開發領域的人們討論軟件時,通常會先設計一個基本的基于 Devops 的架構,保證實現持續集成(Continuous integration,CI)和持續交付(Continuous delivery,CD)的基本功能。一個好的架構是很重要的,否則在未來添加新功能會變得更慢,成本更高。在工業開發領域也有人提出了一種被稱為 MLops 的新的開發范式,即機器學習時代的 Devops。

而在學術領域中,新的更大的數據集發布后,原來的模型大多數也會出現精度急劇下降的問題,這就需要進行算法的進一步優化。因此,可復現機器學習(Reproducible Machine Learning,RML)的概念也應運而生。
1. What is Reproducible Machine Learning?
Reproducible Machine Learning,顧名思義,即為可復現的機器學習。考慮到工業界和學術界數據迭代速度的巨大差別,RML在實際實踐上其實和MLops有很大區別。MLops是一種聚焦于軟件開發架構的工業環境下進行合作開發和快速迭代上線的能力的標準算法開發范式。而RML則是一種解決實驗室環境下不同機器合作開發、算法移植、實驗記錄和依據新增加的數據進行模型迭代的算法開發范式。簡而言之就是,RML的要求比MLops低很多,同時主要框架參考自MLops。MLops中提出的一些Serverless,Continuous training的概念及工具,RML都沒有涉及。其實也可以理解,實驗室環境中畢竟算法研究工作者比較多,開發比較少,產品經理近似于沒有,同時不需要進行基于數據的快速的迭代,只需要在不同師兄弟之間可以實現模型的復現和實驗記錄就可以了。因此RML需要掌握的技能點也比較簡單,主要包括代碼版本管理(Git), 數據版本管理 (DVC),實驗過程監控 (Weight & Biases),特征管理(Feature Store),實驗參數設置 (Hydra),以及容器技術(Docker)等。

PS:還有一個概念叫做 Continuous Machine Learning,其比 MLops 更強,其通過更新的數據監測模型推理結果,同時基于這樣的結果實現模型的自動優化訓練。CML 的目標是架設一種可持續的模擬人類連續獲取和微調信息的能力的。其相當于是基于 MLops 范式架構做 AutoML,力圖實現 Continuous integration,Continuous deliver,Continuous training (CT)的新的自動化機器學習算法來解決 Concept drift 的問題。
2. Reproducible Machine Learning的發展與架構
一般來說實驗室主要的目光還是聚焦于可以實現并訓練一個機器學習模型,該模型在給定用例的相關訓練數據的情況下,可在測試集上實現出色的預測性能。但是,由于缺乏有效的代碼管理和數據管理,當出現師兄畢業,服務器宕機等問題之后,難以實現原來的最優的實驗結果。同時每次數據集更新后,需要缺乏對于上次實驗的過程記錄,只能依賴于最后一次的超參數結果,近乎等于從頭開始調參。更官方的說,真正的挑戰不是構建機器學習模型,而是構建集成的機器學習系統以及在生產環境中持續運行該系統。如同谷歌在 The High Interest Credit Card of Technical Debt 提出的,在實際的機器學習系統中,只有一小部分是由機器學習代碼組成的。所需的相關元素既龐大又復雜。

用圖示來表示,目前大多數實驗室的訓練流程如下所示,也就是仍然處于 MLOps 級別 0:手動過程。
PS:甚至有時候訓練模型沒來得及記錄,四舍五入變成了 MLOps 級別 -1:反復手動過程
用 MLOps 級別 0 的總結,這種過程有如下特點:
- 腳本驅動的交互式手動過程:每個步驟(包括數據分析、數據準備、模型訓練和驗證)都是手動的。該過程需要手動執行每個步驟,并且手動從一個步驟轉到另一個步驟。此過程通常由數據科學家以交互方式在筆記本中編寫和執行的實驗性代碼驅動,直到生成有效的模型為止。
- 機器學習與操作分離:該過程會將創建模型的數據科學家與將模型用作預測服務的工程師分開。數據科學家將經過訓練的模型作為工件移交給工程團隊,以便在其 API 基礎架構上進行部署。此移交工作可能包括將經過訓練的模型放在存儲位置、將模型對象簽入代碼庫,或者將其上傳到模型注冊表。然后,部署模型的工程師需要在生產環境中提供所需的功能以實現低延時服務,這可能會導致訓練-應用偏差。
- 不頻繁發布迭代:該過程假定您的數據科學團隊管理一些不會頻繁更改(更改模型實現或使用新數據重新訓練模型)的模型。新模型版本每年僅部署幾次。
- 無 CI:由于假定幾乎不更改實現,因此 CI 已被忽略。通常,測試代碼是筆記本或腳本執行的一部分。實現實驗步驟的腳本和筆記本由源代碼控制,并生成經過訓練的模型、評估指標和可視化等工件。
- 無 CD:由于不會頻繁部署模型版本,因此不考慮 CD。
- 部署指的是預測服務:該過程僅涉及將經過訓練的模型部署為預測服務(例如,具有 REST API 的微服務),而不是部署整個機器學習系統。
- 缺少主動性能監控:該過程不會跟蹤或記錄模型預測和操作,模型預測和操作是檢測模型性能下降和其他模型行為偏移所必需的信息。

這些特點有些我覺得其實還可以接受,不是很影響算法的迭代。但是有卻是真實存在的影響生產力的問題:
- 腳本驅動的交互式手動過程:一次一般只能測一組參數,還得盯著結果,影響調參速度
- 機器學習與操作分離:不設置Feature Store,新數據集發布后又要重新調參
- 無 CI/CD:缺乏代碼版本管理,無法進行合作開發;
- 缺少主動性能監控:實驗過程中一般只用命令行顯示和記錄,沒有有效的進行可視化,導致調參困難;
因此我們需要針對現有的問題進行改進,考慮到 MLops 2級里加了一些快速進行 CI/CD 的功能,實驗室環境下部署那些內容消耗大量精力但是獲得的回報較少,因為也不需要那么快的迭代。因此當前主流推行的RML我認為其實是一種 MLops 1 級的架構。

這種架構中使用 Hydra 等工具實現多次訓練和結果記錄,增設了 Source repository 用來做代碼管理和數據管理,增設了 Feature Store 用于下一次新的數據集更新后的特征選擇/網絡架構選擇,增設了可以可視化的 Performance monitoring,基本可以解決當前的主要矛盾。考慮到實驗室環境下不同算法研究者間的代碼移植,RML 的主要框架中還增設了 Dockerization 的操作,可以實現不同操作系統,文件系統,用戶間的代碼移植。
3. Reproducible Machine Learning 基本工具
代碼版本管理—— Git
現在的話,實際上很多實驗室都在 Github 上建立了自己的倉庫,因此 Git 也就成為了 RML 里最基本的工具之一。

- 初始化 Git 倉庫
要實現基于 Git 的代碼版本管理,首先需要初始化一個倉庫。
$ git config --global user.name "Vaew" $ git config --global user.email Vaew@example.com
這里有一個值得注意的就是 Github 上可以免費建立私有倉庫了,因此部分處于某些原因無法開源代碼的研究工作者也不必擔心傳到 Github 上會造成代碼泄漏的問題。
- 添加文件
$ git add
- 檢查狀態
$ git status
- Commit消息
$ git commit -m "Message"
- 更新代碼庫
$ git pull
上述是一些 Git 的基本的 CRUD 操作。Git 其實有一個非常有用的功能就是它可以設置很多個不同的 Branch,比如說,你已經決定要解決前幾天遇到的 #53 問題。想要新建一個分支并同時切換到那個分支上,你可以運行一個帶有 -b 參數的 git checkout 命令:
$ git checkout -b iss53 Switched to a new branch "iss53"
它是下面兩條命令的簡寫:
$ git branch iss53 $ git checkout iss53
更多的操作可以參考:https://git-scm.com/
數據版本管理—— DVC

DVC 其實就是 Data version control 的簡稱,它的基本操作和 Git 類似,是主流的數據版本管理工具。其可以直接基于 S3, Google cloud, Azure, Alibaba cloud, SSH server 甚至是本地物理機控制數據版本。其安裝也非常簡單,只需要一行:
$ pip install dvc
- 初始化
由于 DVC 基于 Git 之上運行,因此需要先進行 Git 的初始化:
$ git init $ dvc init $ git commit -m "Initialize DVC"
路徑下會產生 .dvc/.gitignore 、 .dvc/cache/ 、 . dvc/config,其中最重要的是.dvc/cache/,DVC 會在這里建立檔案的聯結,也是最終會 push 到云端的文件。
- 將模型文件夾加入 DVC 進行版本控制
$ dvc add model
完成后會產生相對應的 .dvc 文件:
dvc-test ├── main.py ├── model │ └── my_model.h5 └── model.dvc
下面以一個模型訓練的完整例子來說明 DVC 工具的使用。
- 數據分割:
$ dvc run -d 要執行的程序或者輸入的檔案 -o 要輸出的檔案 python 要執行的程序 $ dvc run -d script/split_train_test.py -d script/config.py -d dataset/annotation.csv -o dataset/train.csv -o dataset/test.csv python script/split_train_test.py
執行完成后 DVC 文件變化:
$ git status -s ?? train.csv.dvc $ cat train.csv.dvc md5: c53dd6d76f7cdc25aaf2146db6223bf0 cmd: python script/split_train_test.py wdir: . deps: - md5: 826ea439f28fb04923de739af8c26b5d path: script/split_train_test.py - md5: a933ce5d996b1687817a60b3453e18ed path: script/config.py - md5: 87c46f0402b54b960b294ef7791f7cf8 path: dataset/annotation.csv outs: - md5: 896389741ff20a2055acfe5c65893bf1 path: dataset/train.csv cache: true metric: false persist: false - md5: 1ea600a9a7b720cd28fe99ff6d1c3e70 path: dataset/test.csv cache: true metric: false persist: false
我們可以發現:
- DVC 以第一個
-o的檔案去加上.dvc變成新的檔案 - 由于有兩個輸出檔案
test.csv和train.csv,因此都被記錄在train.csv.dvc中 - md5 會記錄檔案在
.dvc/config中的位置,而每一份檔案對于 DVC 來說是建立與原來檔案的鏈接,而非建立新的檔案
$ du -sh .dvc/cache/89/* .dvc/cache/1e/* 60K .dvc/cache/89/6389741ff20a2055acfe5c65893bf1 16K .dvc/cache/1e/a600a9a7b720cd28fe99ff6d1c3e70 $ du -sh dataset/train.csv dataset/test.csv 60K dataset/train.csv 16K dataset/test.csv
最后我們用 Git 管理檔案:
$ git add . $ git commit -m "split data"
- 模型訓練
$ mkdir log $ mkdir model $ dvc run -d script/train.py -d script/config.py -d dataset/train.csv -d dataset/test.csv -o model/model.pth -o log/train-output.txt -o log/test-output.txt python script/train.py $ git add . $ git commit -m "train model"
- 模型評估
值得注意的是,輸出的 eval.txt 檔案,是要用 -M 而並非 -o ,原因是 DVC 后續會去追蹤這份檔案,讓我們能夠快速進行比較。又因為這樣的話沒有輸出檔案因此可以用 -f 的方式來指定輸出的檔案。
$ dvc run -d script/evaluate.py -d script/config.py -d dataset/test.csv -d model/model.pth -M log/eval.txt -f Dvcfile python script/evaluate.py
執行完后,會建立 log/eval.txt以及 Dvcfile ,取名文Dvcfile是因為之后做 dvc repro 時候如果沒有指定的檔案,則會使用 default 的檔案,而 default 檔案名字是 Dvcfile。
PS:Dvcfile 和上面步驟的 .dvc 檔案是一樣的,記載了 md5 、 cache 的信息。由于我們剛剛使用了 -M 輸出 eval.txt 檔案,此處可以使用dvc metrics show查看模型表現:
$ dvc metrics show log/eval.txt: input size: 224 classes number: 7 use pretrained: True epochs: 21 batch size: 32 learning rate: 0.001 momentum: 0.9 accuracy: 0.8626760563380281 $ cat log/eval.txt input size: 224 classes number: 7 use pretrained: True epochs: 21 batch size: 32 learning rate: 0.001 momentum: 0.9 accuracy: 0.8626760563380281 $ git add . $ git commit -m "evaluation"
- Reproduce
假設想要將 epochs 21 修改為 51:
$ git checkout -b epochs51 $ vi script/config.py # 將epochs = 21 改為 epochs = 51 $ dvc repro
由于一開始進行了有效的記錄,因此 DVC 就可以很快地做到自動化復現。
$ dvc metrics show -a epochs51: log/eval.txt: input size: 224 classes number: 7 use pretrained: True epochs: 21 batch size: 32 learning rate: 0.001 momentum: 0.9 accuracy: 0.8661971830985915 master: log/eval.txt: input size: 224 classes number: 7 use pretrained: True epochs: 21 batch size: 32 learning rate: 0.001 momentum: 0.9 accuracy: 0.8661971830985915
- Merge the model to master
為了能夠練習 merge ,因此我多訓練了一組沒有使用 pretrained model 的模型(先切回 master 再 checkout -b)
$ git checkout master $ git checkout -b without_pretrained $ vi script/config.py # 將usePretrained = True 改為 False $ dvc repro $ git add . $ git commit -m "without pretrained" $ dvc metrics show -a epochs51: log/eval.txt: input size: 224 classes number: 7 use pretrained: True epochs: 51 batch size: 32 learning rate: 0.001 momentum: 0.9 accuracy: 0.8626760563380281 master: log/eval.txt: input size: 224 classes number: 7 use pretrained: True epochs: 21 batch size: 32 learning rate: 0.001 momentum: 0.9 accuracy: 0.8661971830985915 wo_pretrained: log/eval.txt: input size: 224 classes number: 7 use pretrained: False epochs: 21 batch size: 32 learning rate: 0.001 momentum: 0.9 accuracy: 0.30633802816901406
若我們要將 without_pretrained 以及 epochs51 merge 在一起,直接 merge 會出現很多沖突:
$ git merge epochs51 Auto-merging train.csv.dvc CONFLICT (content): Merge conflict in train.csv.dvc Auto-merging script/config.py CONFLICT (content): Merge conflict in script/config.py Auto-merging model.pth.dvc CONFLICT (content): Merge conflict in model.pth.dvc Auto-merging log/eval.txt CONFLICT (content): Merge conflict in log/eval.txt Auto-merging Dvcfile CONFLICT (content): Merge conflict in Dvcfile Automatic merge failed; fix conflicts and then commit the result.
因此需要進行手動的調整,以 train.csv.dvc 為例:
<<<<<<< HEAD md5: 8d2bb1a5dd7542df80a684cf04179444 ======= md5: 0a72f2ce076e4d8f5ba6cd476d1cb464 >>>>>>> epochs51 cmd: python script/split_train_test.py wdir: . deps: - md5: 826ea439f28fb04923de739af8c26b5d path: script/split_train_test.py <<<<<<< HEAD - md5: acb7b60a7d2f31a2ea5c527731f3b5f7 ======= - md5: 1ffb468a9e98a8248f5071286cfd0111 >>>>>>> epochs51 path: script/config.py - md5: 87c46f0402b54b960b294ef7791f7cf8 path: dataset/annotation.csv outs: - md5: 896389741ff20a2055acfe5c65893bf1 path: dataset/train.csv cache: true metric: false persist: false - md5: 1ea600a9a7b720cd28fe99ff6d1c3e70 path: dataset/test.csv cache: true metric: false persist: false
需要改成:
md5: 0a72f2ce076e4d8f5ba6cd476d1cb464 cmd: python script/split_train_test.py wdir: . deps: - md5: 826ea439f28fb04923de739af8c26b5d path: script/split_train_test.py - md5: 1ffb468a9e98a8248f5071286cfd0111 path: script/config.py - md5: 87c46f0402b54b960b294ef7791f7cf8 path: dataset/annotation.csv outs: - md5: 896389741ff20a2055acfe5c65893bf1 path: dataset/train.csv cache: true metric: false persist: false - md5: 1ea600a9a7b720cd28fe99ff6d1c3e70 path: dataset/test.csv cache: true metric: false persist: false
修改完后輸入 dvc checkout,目的是為了將 DVC 控制的 pipeline 切回剛剛設定的,並且重新 reproduce 。
$ dvc checkout $ dvc repro $ git add . $ git commit -m "merge without_pretrained and epochs51"
最后再切回 master 去 merge 原本的 branch :
$ git checkout master $ git merge wo_pretrained Fast-forward Dvcfile | 8 ++++---- log/eval.txt | 4 ++-- model.pth.dvc | 12 ++++++------ script/config.py | 2 +- script/train.py | 1 - train.csv.dvc | 4 ++-- 6 files changed, 15 insertions(+), 16 deletions(-)
- 將數據上傳到云
以 Google Cloud 為例,首先安裝套件:
$ pip install dvc[gs]
建立 Bucket 后,將設定好的 gcp-test.json 加入 .dvc/config 中:
$ dvc remote add -d upstream gs://vaew-bucket/ $ dvc remote modify upstream credentialpath gcp-test.json $ dvc push Preparing to upload data to 'gs://vaew-bucket/' Preparing to collect status from gs://vaew-bucket/ Collecting information from local cache... [##############################] 100% Collecting information from remote cache... [##############################] 100 [##############################] 100% Analysing status [##############################] 100% log/test-output.txt [##############################] 100% dataset/test.csv [##############################] 100% log/train-output.txt [##############################] 100% dataset/train.csv [##############################] 100% model/model.pth
DVC 會將 cache 里的資料全部上傳上去。
- 獲取數據
獲取數據可以直接使用 Git
$ git clone https://github.com/vaew/xxx.git $ dvc pull
實驗過程監控—— Weight & Biases
實驗過程監控有很多工具可以使用:Weights & Biases、MlFlow、Neptune、Comet.ml ,這里我們僅介紹 Weight & Biases。總的來說,W&B 提供了 4 個有用的工具:
- Dashboard: 實驗跟蹤
- Artifacts: 數據集版本控制、模型版本控制
- Sweeps: 超參數優化
- Reports: 保存和共享可重現的結果
此處以簡單 Keras 分類器模型為例,首先我們進行安裝和初始化:
$ pip install wandb -q $ wandb login wandb: You can find your API key in your browser here: https://wandb.ai/authorize wandb: Paste an API key from your profile and hit enter:
然后我們在 Jupyter Notebook 創建一個新的項目,并設置如下超參數:
project_name = 'first_steps'
group_name = 'cnn'
experiment_name = '2_conv'
config_dict = {
"conv_1": 16,
"activation_1": "relu",
"kernel_size": (3, 3),
"pool_size": (2, 2),
"dropout": 0.7,
"conv_2": 32,
"activation_out": "softmax",
"optimizer": "adam",
"loss": "sparse_categorical_crossentropy",
"metric": "accuracy",
"epoch": 6,
"batch_size": 32
}
wandb.init(
project=project_name,
group=group_name,
name=experiment_name,
config=config_dict
)
config = wandb.config
config 是一個帶有超參數的字典。您還可以加載 .yaml 格式的配置文件。 wandb.init 在 W&B 中創建一個新的運行并啟動后臺進程同步數據。接下來我們進行數據的加載并定義一個簡單的 CNN 模型:
import tensorflow as tf
from tensorflow.keras.callbacks import Callback
from wandb.keras import WandbCallback
import numpy as np
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data() ##data download
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
x_train, y_train = x_train[::5], y_train[::5]
class_names = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
def cnn_mnist(config, num_classes = 10, input_shape = (28, 28, 1)): ##simple Keras CNN
img_inputs = tf.keras.Input(shape=input_shape)
conv_1 = tf.keras.layers.Conv2D(config.conv_1, kernel_size=config.kernel_size, activation=config.activation_1)(img_inputs)
pool_1 = tf.keras.layers.MaxPooling2D(pool_size=config.pool_size)(conv_1)
conv_2 = tf.keras.layers.Conv2D(config.conv_2, kernel_size=config.kernel_size, activation=config.activation_1)(pool_1)
pool_2 = tf.keras.layers.MaxPooling2D(pool_size=config.pool_size)(conv_2)
flatten = tf.keras.layers.Flatten()(pool_2)
dropout = tf.keras.layers.Dropout(config.dropout)(flatten)
dense_out = tf.keras.layers.Dense(num_classes, activation=config.activation_out)(dropout)
model = tf.keras.models.Model(inputs=img_inputs, outputs=dense_out)
model.compile(loss=config.loss, optimizer=config.optimizer, metrics=[config.metric])
return model
our_model = cnn_mnist(config)
our_model.fit(x_train, y_train, epochs=config.epoch, batch_size=config.batch_size,
validation_data=(x_test, y_test),
callbacks=[wandb.keras.WandbCallback(data_type="image",
labels=class_names)])
wandb.finish()
之后我們可以在 DashBoard 中查看我們的結果。Sweeps 是一種用于超參數和模型優化的工具,其詳細的優化機制如使用 Optuna 框架進行高效的超參數優化介紹。首先定義超參數優化策略:
# Configure the sweep – specify the parameters to search through, the search strategy, the optimization metric et all.
sweep_config = {
'method': 'random', #grid, random
'metric': {
'name': 'accuracy',
'goal': 'maximize'
},
'parameters': {
'epoch': {
'values': [5, 10]
},
'dropout': {
'values': [0.3, 0.4, 0.5]
},
'conv_1': {
'values': [16, 32, 64]
},
'conv_2': {
'values': [16, 32, 64]
},
'optimizer': {
'values': ['adam', 'nadam', 'sgd', 'rmsprop']
},
'activation_1': {
'values': ['relu', 'elu', 'selu','sigmoid']
},
'kernel_size': {
'values': [(3, 3), (5, 5), (7, 7)]
},
}
}
然后讓我們創建一個 sweep 并定義一個 train 函數。sweep 使用每組超參數的時候會調用此函數。
sweep_id = wandb.sweep(sweep_config, project="first_steps")
def train():
# Default values for hyper-parameters we're going to sweep over
config_defaults = {
"conv_1": 32,
"activation_1": "relu",
"kernel_size": (3, 3),
"pool_size": (2, 2),
"dropout": 0.1,
"conv_2": 64,
"activation_out": "softmax",
"optimizer": "adam",
"loss": "sparse_categorical_crossentropy",
"metric": "accuracy",
"epoch": 6,
"batch_size": 32
}
# Initialize a new wandb run
wandb.init(config=config_defaults)
# Config is a variable that holds and saves hyperparameters and inputs
config = wandb.config
model = cnn_mnist(config=config)
model.fit(x_train, y_train, epochs=config.epoch, batch_size=config.batch_size,
validation_data=(x_test, y_test),
callbacks=[wandb.keras.WandbCallback()])
wandb.agent(sweep_id, train)
得到如下輸出:


除了 Sweeps 和 Dashboard,W&B 還提供了一個名為 Artifacts 的實用工具,可以實現數據和模型的記錄(有點類似于上面的 DVC)。首先加載一個原始數據集,然后創建一個新的 Artifact。
from collections import namedtuple
Dataset = namedtuple("Dataset", ["x", "y"])
def load_data_split(train_size=50_000):
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_val = x_train[:train_size], x_train[train_size:]
y_train, y_val = y_train[:train_size], y_train[train_size:]
training_data = Dataset(x_train, y_train)
validation_data = Dataset(x_val, y_val)
test_data = Dataset(x_test, y_test)
datasets = [training_data, validation_data, test_data]
return datasets
def load_and_log():
with wandb.init(project=project_name, job_type="load-data") as run:
datasets = load_data_split()
names = ["training", "validation", "test"]
# Artifact
raw_data = wandb.Artifact(
"mnist-raw", type="dataset",
description="Raw MNIST dataset, splitted",
metadata={"source": "keras.datasets.mnist",
"train_data": len(datasets[0].x),
"valid_data": len(datasets[1].x),
"test_daata": len(datasets[2].x)})
for name, data in zip(names, datasets):
# Save our datasets
with raw_data.new_file(name + ".npz", mode="wb") as file:
np.savez(file, x=data.x, y=data.y)
#save Artifact
run.log_artifact(raw_data)
load_and_log()
一個輸出 Artifact 會包含如下內容:
- 元數據—— 當前 Artifact 的描述。在我們的例子中,它是我們數據集的描述(來源,分割大小)
- 文件——數據集
- 圖形視圖—— Artifacts 圖表(輸入、輸出、過程)
我們添加一個新的 Artifact,它將描述數據預處理:
def preprocess_dataset(dataset, normalize=True, expand_dims=True, to_categorical=True):
x, y = dataset.x, dataset.y
if normalize:
x = x.astype("float32") / 255
if expand_dims:
x = np.expand_dims(x, -1)
if to_categorical:
y = tf.keras.utils.to_categorical(y, 10) # Hardcoded num_classes
return Dataset(x, y)
import os
def preprocess_and_log(preprocess_steps):
with wandb.init(project=project_name, job_type="data_preprocessing", name="preprocess_simple") as run:
processed_data = wandb.Artifact(
"mnist-preprocessed", type="dataset",
description="Preprocessed MNIST dataset",
metadata=preprocess_steps)
# which Artifact we will use
raw_data_artifact = run.use_artifact('mnist-raw:latest')
# download Artifact
raw_dataset = raw_data_artifact.download()
for split in ["training", "validation", "test"]:
datafile = split + ".npz"
data = np.load(os.path.join(raw_dataset, datafile))
raw_split = Dataset(x=data["x"], y=data["y"])
processed_dataset = preprocess_dataset(raw_split, **preprocess_steps)
with processed_data.new_file(split + ".npz", mode="wb") as file:
np.savez(file, x=processed_dataset.x, y=processed_dataset.y)
run.log_artifact(processed_data)
steps = {"normalize": True,
"expand_dims": True,
"to_categorical" : True}
preprocess_and_log(steps)
我們現在有 2 個 Artifacts:“mnist-raw”和“mnist-preprocessed”。矩形是輸入/輸出 Artifacts,圓形是 Artifacts 之間的過程。在圖表視圖的幫助下,可以輕松跟蹤 pipeline 在工作過程中發生的變化。

Reports 功能則是可以直接從網站導出報告。
特征管理—— Feature Store

首先我們引用 Amazon SageMaker Feature Store 中對特征的定義,對特征做一個解釋:
特征是指在訓練和推斷期間用來進行預測的屬性或特性模型。例如,在推薦音樂播放列表的機器學習應用程序中,特征可能包括歌曲評分、播放歷史以及播放時長。機器學習模型的精確度基于特征的精確集合和組成。通常,這些特征會被訓練多個模型的多個團隊重復使用。而且。用于訓練模型的任何特征集都要可用于進行實時預測(推理)。在這些不同的訪問模式中保持一個統一且最新的特征來源是一項挑戰,因為大多數組織會保留兩個不同的特征存儲庫,一個用于訓練,另一個用于推理。
實際上,目前很多結構化數據的比賽強依賴于數據的特征工程,如 Prof. Andrew Ng 所言
Coming up with features is difficult, time-consuming, requires expert knowledge. ‘Applied machine learning’ is basically feature engineering.
考慮到很多情況下相似結構的數據會有一些類似的強特征,因此建立 Feature Store 就像建立 ACM 選手的板子一樣,是一件能夠最小化特征工程精力的事情。實際上當前時序數據的tsfresh就是基于這樣的思路來自動抽取模型的特征。特征存儲是驅動高可用ML模型的數據處理部分的中心樞紐。其將原始數據轉換為特征值,存儲這些值,并將其用于模型訓練和在線預測。通過自動化這些步驟,特征存儲使得我們在數小時內針對新的數據構建和部署新的模型。
由于 AWS 關于 Feature Store 的操作很詳細,而且過程也很簡單,這里直接給出鏈接:https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-use-with-studio.html
更多關于 Feature Store 的資料可以參考:https://www.featurestore.org/
實驗參數設置—— Hydra
Facebook Hydra 允許開發人員通過編寫和覆蓋配置來簡化 Python 應用程序(尤其是機器學習方面)的開發。開發人員可以借助Hydra,通過更改配置文件來更改產品的行為方式,而不是通過更改代碼來適應新的用例。Hydra 提供了一種靈活的方法來開發和維護代碼及配置,從而加快了機器學習研究等領域中復雜應用程序的開發。它允許開發人員從命令行或配置文件“組合”應用程序的配置。這解決了在修改配置時可能出現的問題,例如:
- 維護配置的稍微不同的副本或添加邏輯以覆蓋配置值。
- 可以在運行應用程序之前就組成和覆蓋配置。
- 動態命令行選項卡完成功能可幫助開發人員發現復雜配置并減少錯誤。
- 可以在本地或遠程啟動應用程序,使用戶可以利用更多的本地資源。
Hydra 其他好處包括:
- 為新用例和需求的項目添加功能變得更加容易,而無需重寫大量代碼。
- 減少了復雜應用程序中常見的一些樣板代碼,例如處理配置文件,配置日志記錄和定義命令行標志。
Hydra 的安裝:
$ pip install --upgrade hydra-core
Hydra 簡單用例
- 減少配置文件代碼量
其可以可以極大程度減少配置代碼的代碼量,下面給出一個簡單的例子:
import hydra
@hydra.main()
def app(cfg):
print(cfg.pretty())
print("The user is : " + cfg.user)
if __name__ == "__main__":
app()
運行如下:
$ python3 test_hydra.py +user=ua +pwd=pa
輸出如下:
Use OmegaConf.to_yaml(cfg) category=UserWarning, user: vaew pwd: vaew The user is : vaew
- 簡化參數處理
常見的一個機器學習程序中,是用如下代碼來處理輸入和各種參數。
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
通過示例代碼我們可以看出來,在 hydra 之中,我們直接使用 cfg.user 就可以。
而且還可以通過配置文件來直接處理參數,比如:
@hydra.main(config_path="conf", config_name="config") def my_app(cfg: DictConfig) -> None: print(OmegaConf.to_yaml(cfg))
- 輸出目錄
人們在做研究時經常遇到的一個問題是如何保存輸出。典型的解決方案是傳入一個指定輸出目錄的命令行標志,但是當你希望同時運行多項任務,并且必須為每個任務傳遞不同的輸出目錄時,這尤其令人惱火。Hydra 通過為每次運行生成輸出目錄,并在運行代碼之前更改當前工作目錄來解決此問題。這樣可以很好地將來自同一 sweep 的任務分組在一起,同時保持每個任務與其他任務的輸出分離。我們可以簡單的來看看目錄的變化,可以看到,在當前目錄下生成了一個 outputs 目錄。其內部組織是按照時間來進行,把每次運行的輸出,log 和配置都歸類在一起。
├── outputs │ └── 2021-03-21 │ ├── 11-52-35 │ │ ├── .hydra │ │ │ ├── config.yaml │ │ │ ├── hydra.yaml │ │ │ └── overrides.yaml │ │ └── test_hydra.log │ └── 11-57-55 │ ├── .hydra │ │ ├── config.yaml │ │ ├── hydra.yaml │ │ └── overrides.yaml │ └── test_hydra.log ├── test_hydra.py
- 配置所在
我們分別打開兩個 .hydra 目錄下的 config.yaml 文件看看。可以看到,每次運行時候,對應的參數配置都保存在其中。這樣極大的方便了用戶的比對和分析。
$ cat outputs/2021-03-21/11-52-35/.hydra/config.yaml user: ua pwd: pa $ cat outputs/2021-03-21/11-57-55/.hydra/config.yaml user: ub pwd: pb
Multirun 處理組合情況
Multirun 是 Hydra 的一種功能,它可以多次運行你的函數,每次都組成一個不同的配置對象。這是一個自然的擴展,可以輕松地組合復雜的配置,并且非常方便地進行參數掃描,而無需編寫冗長的腳本。
例如,對于兩種參數,我們可以掃描所有 4 個組合,一個命令就是會完成所有組合的執行:
$ python test_hydra.py --multirun user=ua,ub pwd=pa,pb
得到輸出如下:
[2021-03-27 11:57:54,435][HYDRA] Launching 4 jobs locally [2021-03-27 11:57:54,435][HYDRA] #0 : +user=ua +pwd=pa user: ua pwd: pa [2021-03-27 11:57:54,723][HYDRA] #1 : +user=ua +pwd=pb user: ua pwd: pb [2021-03-27 11:57:54,992][HYDRA] #2 : +user=ub +pwd=pa user: ub pwd: pa [2021-03-27 11:57:55,248][HYDRA] #3 : +user=ub +pwd=pb user: ub pwd: pb
可以看到生成如下目錄樹,每個參數組合對應了一個目錄。
├── multirun │ └── 2021-03-27 │ └── 11-57-53 │ ├── 0 │ │ ├── .hydra │ │ │ ├── config.yaml │ │ │ ├── hydra.yaml │ │ │ └── overrides.yaml │ │ └── test_hydra.log │ ├── 1 │ │ ├── .hydra │ │ │ ├── config.yaml │ │ │ ├── hydra.yaml │ │ │ └── overrides.yaml │ │ └── test_hydra.log │ ├── 2 │ │ ├── .hydra │ │ │ ├── config.yaml │ │ │ ├── hydra.yaml │ │ │ └── overrides.yaml │ │ └── test_hydra.log │ ├── 3 │ │ ├── .hydra │ │ │ ├── config.yaml │ │ │ ├── hydra.yaml │ │ │ └── overrides.yaml │ │ └── test_hydra.log │ └── multirun.yaml
多線程運行
Python subprocess 允許你去創建一個新的進程讓其執行另外的程序,并與它進行通信,獲取標準的輸入、標準輸出、標準錯誤以及返回碼等。subprocess 模塊中定義了一個 Popen 類,通過它可以來創建進程,并與其進行復雜的交互。Popen 是 subprocess 的核心,子進程的創建和管理都靠它處理。
構造函數:
class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0,restore_signals=True, start_new_session=False, pass_fds=(),*, encoding=None, errors=None)
常用參數:
- args:shell 命令,可以是字符串或者序列類型(如:list,元組)
- bufsize:緩沖區大小。當創建標準流的管道對象時使用,默認 -1。0:不使用緩沖區 1:表示行緩沖,僅當 universal_newlines=True 時可用,也就是文本模式 正數:表示緩沖區大小 負數:表示使用系統默認的緩沖區大小。
- stdin, stdout, stderr:分別表示程序的標準輸入、輸出、錯誤句柄
- preexec_fn:只在 Unix 平臺下有效,用于指定一個可執行對象(callable object),它將在子進程運行之前被調用
- shell:如果該參數為 True,將通過操作系統的 shell 執行指定的命令。
- cwd:用于設置子進程的當前目錄。
- env:用于指定子進程的環境變量。如果 env = None,子進程的環境變量將從父進程中繼承。
具體例子:
下面例子很簡陋,不能直接運行,只是給大家演示下大致思路,還請根據具體情況做相關調整。
- 我們通過 subprocess.Popen 啟動了 spark;
- hydra 的輸入 可以轉換為 spark 和 python 的輸入;
- 然后讀取子進程的 stdout;
- 逐次使用 log.info 來打印轉發的 stdout,這樣 spark 的輸出就被轉發到了 hydra 的輸出之中;
這樣,spark 的輸出就可以被 hydra 捕獲,從而整合到 hydra log 體系之中。
import shlex
import subprocess
import hydra
import logging
log = logging.getLogger(__name__)
@hydra.main()
def app(cfg):
# 可以在這里事先處理參數,被 hydra 處理之后,也成為 spark 和 python 的輸入,進行處理
shell_cmd = 'spark-submit cut_words.py' + cfg.xxxxxx # 假如 cut_words 有參數
cmd = shlex.split(shell_cmd)
p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while p.poll() is None:
line = p.stdout.readline()
line = line.strip()
if line:
log.info('Subprogram output: [{}]'.format(line))
if p.returncode == 0:
log.info('Subprogram success')
else:
log.info('Subprogram failed')
if __name__ == '__main__':
app()
流程示例:
以下就是我采取辦法的流程示例。
- Input 由 hydra 處理之后,由 python 父進程轉發給 spark 和 Business Python;
- 具體 spark 的輸出,由 python 父進程轉發給 Hydra logging;
具體如下圖:
Input Input Hydra +----------+ +------------------------v | ^ | | | | | | | +-------------------------+ v | | | | +------+-------------+ | v +----------> | | Spark | | | | | | Parent Python Process | | Business Python | | | | | | +<-----------^ | | | | | | | | | +-------------------------+ +------+-------------+ | | | | | | | | | Hydra <---------<+ +------------------------+ Logging Output
容器技術—— Docker
Docker 屬于 Linux 容器的一種封裝,提供簡單易用的容器使用接口,其可以為模型在別的 PC 上的運行提供提供一次性的環境。它是目前最流行的 Linux 容器解決方案。Docker 將應用程序與該程序的依賴,打包在一個文件里面。運行這個文件,就會生成一個虛擬容器。程序在這個虛擬容器里運行,就好像在真實的物理機上運行一樣。有了 Docker,就不用擔心環境問題。總體來說,Docker 的接口相當簡單,用戶可以方便地創建和使用容器,把自己的應用放入容器。容器還可以進行版本管理、復制、分享、修改,就像管理普通的代碼一樣。要進一步理解 Docker 需要了解如下幾個概念:
- 鏡像(Image)
是一個只讀模板,用來運行 Docker 容器。
- 容器(Container)
負責應用程序的運行,包括操作系統、用戶添加的文件以及元數據 容器是從鏡像創建的運行實例。它可以被啟動、開始、停止、刪除。每個容器都是相互隔離的、保證安全的平臺。
注:鏡像是只讀的,容器在啟動的時候創建一層可寫層作為最上層。
- 倉庫(Repository)
倉庫是集中存放鏡像文件的場所。倉庫分為公開倉庫(Public)和私有倉庫(Private)兩種形式。
注:Docker 倉庫的概念跟 Git 類似,注冊服務器可以理解為 GitHub 這樣的托管服務
安裝Docker
$ curl -fsSL https://get.docker.com/ | sh $ sudo service docker restart
關于鏡像的操作
獲取
# 從倉庫注冊服務器拉取 $ sudo docker pull ubuntu:14.04 # 官方倉庫注冊服務器,相當于 sudo docker pull registry.hub.docker.com/ubuntu:14.04 # 也可使用其他倉庫,如: sudo docker pull dl.dockerpool.com:5000/ubuntu:12.04
顯示
$ sudo docker images
運行
$ sudo docker run -t -i ubuntu:14.04 /bin/bash
修改
# 運行容器bash,通過 shell 進行操作 $ sudo docker run -t -i ubuntu:14.04 /bin/bash # 提交更新 $ sudo docker commit -m "Added json gem" -a "Docker Newbee" 0b2616b0e5a8 ouruser/sinatra:v2
-m: 提交信息-a: 指定更新的用戶信息0b2616b0e5a8: 容器的 IDouruser/sinatra: 倉庫名v2: 倉庫 tag
創建
- 通過修改已有
image, 具體操作修改中已有 - 通過
Dockerfile來創建
$ mkdir mydockerimg # docker image 的配置文件 $ vim Dockerfile # 創建 image $ sudo docker build -t testimg .
-t: 指定新的 image 的名字.: Dockerfile 所在目錄
Dockerfile示例
# 這是注釋 FROM ubuntu:14.04 MAINTAINER AIR_CC RUN apt-get -y update CMD echo "hello-world" ADD myApp /var/www EXPOSE 80
#: 注釋FROM: 告訴 Docker 使用哪個鏡像作為基礎MAINTAINER: 維護者的信息RUN: 在創建鏡像時運行的操作CMD: 啟動容器后運行的程序ADD: 復制本地文件到鏡像WORKDIR: 設置 dockerfile 命令運行目錄EXPOSE: 對外部開放端口
3.通過文件系統導入
$ sudo cat ubuntu-14.04-x86_64-minimal.tar.gz | docker import - ubuntu:14.04
保存與載入
1.保存已有的鏡像
$ sudo docker save -o ubuntu_14.04.tar ubuntu:14.04
2.載入鏡像
$ sudo docker load --input ubuntu_14.04.tar # 或者 sudo docker load < ubuntu_14.04.tar
刪除
# 暫停并刪除相應的容器 $ sudo docker stop containerName $ sudo docker rm containerName # 刪除鏡像 $ sudo docker rmi imageName
注:刪除鏡像之前需要先stop & rm相應的container
# 刪除所有的 container $ sudo docker stop $(docker ps -a -q) $ sudo docker rm $(docker ps -a -q) # 刪除所有的 image $ sudo docker rmi $(docker images -q)
關于容器的操作
容器是獨立運行的一個或一組應用,以及它們的運行態環境。對應的,虛擬機可以理解為模擬運行的一整套操作系統(提供了運行態環境和其他系統環境)和跑在上面的應用。
運行
# 運行一下,就終止 $ sudo docker run -tid ubuntu:14.04
run的參數說明
-t: 為 container 分配一個偽終端(pseudo-tty),并綁定到容器的標準輸入上-i: 讓容器的標準輸入保持打開-d: 使容器在后臺以守護態(Daemonized)形式運行
終止
使用sudo docker stop 終止; 注:當容器中指定的應用終結時,容器也會終止 sudo docker start -- 啟動一個處于終止的容器 sudo docker restart -- 重啟一個處于運行態的容器
運行日志
獲取 container 中程序輸出到 terminal 上的信息 docker logs
進入
- 使用 attach
$ sudo docker run -tid ubuntu:14.04 $ sudo docker attach # ctrl + c: 殺死該 contanier # ctrl + p + ctrl + q: 退出 container 交互界面
- 使用 nsenter
導出與導入
- 導出
$ sudo docker export > outputFileName
- 導入
$ sudo docker import
注:用戶既可以使用 docker load 來導入鏡像存儲文件到本地鏡像庫,也可以使用 dockermport 來導入一個容器快照到本地鏡像庫。這兩者的區別在于容器快照文件將丟棄所有的歷史記錄和元數據信息(即僅保存容器當時的快照狀態),而鏡像存儲文件將保存完整記錄,體積也要大。此外,從容器快照文件導入時可以重新指定標簽等元數據信息。
刪除
$ sudo docker stop $ sudo docker rm
- 注: 刪除容器前需先
stop*
其他
獲取 container 的 PID
$ sudo docker inspect --format '{{.State.Pid}}'
4. 總結
本文從 MLops 因何而來開始分析,比較了實驗室環境下的 RML 和工業環境下的 MLops 的區別和聯系。基于 MLops 1 級架構的基本思路對 Reproducible Machine Learning 的基本架構進行了簡單的分析,之后對 MLops 1 級架構中的核心工具以及 Docker 技術進行了簡單的介紹。實際環境下的 RML 還有很多別的優秀的基本工具:MLFlows,neptune.ai,DELTA LAKES 等,可以基于這樣的架構進行更進一步的探索,使用更適合自己的工具。
5. 參考資料
[1] Why Machine Learning Models Crash And Burn In Production,https://www.forbes.com/sites/forbestechcouncil/2019/04/03/why-machine-learning-models-crash-and-burn-in-production/
[2] Software Architecture Guide,https://martinfowler.com/architecture/
[3] 從小作坊到智能中樞: MLOps簡介,https://zhuanlan.zhihu.com/p/357897337
[4] "Reproducible Deep Learning" PhD course,https://www.sscardapane.it/teaching/reproducibledl/
[5] What is Continuous Machine Learning? https://levity.ai/blog/what-is-continuous-machine-learning
[6] Machine Learning: The High Interest Credit Card of Technical Debt,https://research.google/pubs/pub43146/
[7] MLOps:機器學習中的持續交付和自動化流水線,https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
[8] A curated list of awesome open source libraries to deploy, monitor, version and scale your machine learning,https://github.com/EthicalML/awesome-production-machine-learning
[9] Efficient Hyperparameter Optimization with Optuna Framework,https://broutonlab.com/blog/efficient-hyperparameter-optimization-with-optuna-framework
[10] Data science experiments management with Weights & Biases, https://wandb.ai/broutonlab/first_steps/reports/Data-Science-Experiments-Management-with-Weights-Biases---Vmlldzo2NjE3MDI
[11] A project-based course on the foundations of MLOps with a focus on intuition and application,https://github.com/GokuMohandas/mlops
[12] Full Stack Deep Learning (UC Berkeley CS194-080),https://fullstackdeeplearning.com/
[13] Amazon SageMaker Feature Store,https://aws.amazon.com/cn/sagemaker/feature-store/
[14] Why We Need DevOps for ML Data,https://www.tecton.ai/blog/devops-ml-data/
[15] Use Amazon SageMaker Feature Store with Amazon SageMaker Studio,https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-use-with-studio.html
[16] Feature Store for ML,https://www.featurestore.org/
[17] 用 Facebook Hydra 參數配置框架來簡化程序配置,https://www.cnblogs.com/rossiXYZ/p/14826431.html
[18] Docker 的基本使用,https://www.jianshu.com/p/8c932dadceef
[19] Docker 入門教程,https://www.ruanyifeng.com/blog/2018/02/docker-tutorial.html
[20] Data Version Control With Python and DVC,https://realpython.com/python-data-version-control/
[21] Comparing Data Version Control Tools - 2020,https://dagshub.com/blog/data-version-control-tools/
[22] 一小時搭建一個云原生機器學習平臺,https://zhuanlan.zhihu.com/p/383528646
[23] Continuous Delivery for Machine Learning,https://martinfowler.com/articles/cd4ml.html
[24] Machine Learning Operations,https://ml-ops.org/
[25] Introducing MLOps,https://www.oreilly.com/library/view/introducing-mlops/9781492083283/
[26] Visualizer for neural network, deep learning, and machine learning models,https://github.com/lutzroeder/netron