聯邦學習開源框架FATE-Flow 源碼分析
一. FATE整體架構
FATE是首個工業級的開源聯邦學習框架,據中國信通院數據顯示,55%的國內隱私計算產品是基于或參考了開源項目,其中以FATE開源項目為主。FATE實現了基于同態加密和安全多方計算的安全協議。FATE支持橫縱向聯邦學習場景,包含了多種聯邦學習算法,包括邏輯回歸、secure boost、深度學習等。FATE提供了聯邦學習全流程的解決方案,具備開箱即用的特點。FATE整體架構和基本流程如下圖1、2所示,本篇文章將主要介紹聯邦學習任務調度的核心:FATE-Flow。

圖1:FATE整體架構

圖2:FATE流程
二. FATE-Flow架構
FATE-Flow提供了端到端的聯邦學習任務流水線管理模塊,架構如圖3所示。

圖3:FATE-Flow流程
在FATE-Flow中,由如下幾個關鍵模塊:
- DAG:定義了流水線,使用JSON格式的DSL來定義DAG。
- DSL Parser:是調度的核心,通過 DSL parser 解析到上下游關系及依賴。
- Job Scheduler:一個DAG作為一個Job,而DAG中的節點稱為Task,一個Job由若干Task組成。
- Task Controller:最小的調度單位,FATE-Flow將Task的執行獨立為隔離的進程。
- Data Manager:用作數據的上傳、下載等。
- Resource Manager: 負責計算整個Job需要的資源的大小,返回資源申請狀態等。只有多方資源申請成功,才會向各方發送start job指令。
提交一個Job的流程如下:Job首先提交到Queue中,JobScheduler解析DAG加入到Task Queue中,調度 Executor執行,同時這個任務會分發到聯邦學習的各個參與方。在任務執行中會收集參與方狀態,進行下一步的調度。Task stat記錄Task的狀態信息,例如啟動時間、運行狀態、結束時間、超時時間等。如果Task運行時間超過默認超時時間、異常終止或者正常運行,則啟動shutdown,結束進程,清理任務,同步到所有聯邦參與方,保證任務的一致性。
三. 源碼分析
FATE-Flow后端使用的是Flask,Flask是一個輕量級的python web框架,FATE-Flow server的程序入口是在python/fate_flow/fate_flow_server.py。如圖4所示,通過源碼分析,我們發現啟動了兩個server:9380端口的http server 和9360端口的grpc server。http接口用作自身api的調用,而grpc 則用作參與方間函數調用。

圖4:Flask啟動http server
熟悉Flask的朋友都知道,Flask使用藍圖來組裝不同的組件,在Fate-Flow server中同樣如此。在apps目錄下構成了后端程序的基本組件,其中主要包含:
- checkpoint_app: 數據/模型更新
- component_app.py:獲取組件詳情,驗證組件參數
- data_access_app.py:數據上傳及下載
- info_app.py:獲取mysql、fateboard、eggroll版本信息
- job_app.py:核心,提供job、task、查詢等接口
- table_app.py:數據表的操作,通過mysql導入數據也是在這里完成的
- tracking_app.py:獲取組件的狀態,包括獲得組件的輸出也是在這里完成的
- version_app.py:顯示FATE相關版本信息,其中docker-compose部署healthy檢查就是通過此接口。
- log_app.py:負責log相關接口,解決了之前版本中fateboard無法獲取部分log的bug
- permission_app.py:權限控制相關接口
- model_app.py:模型下載、部署,用作在線推理
3.1
輪詢檢測與調度
如圖5所示,Detector每5s執行一次,負責檢測運行中、結束的Job、Task、資源等;而DAGScheduler每2s執行一次,依次調度waiting、running、ready、rerun狀態的Job,更新結束 Job的狀態。

圖5:兩個輪詢方法
3.2
從提交Job分析源碼
job_app接收請求
用戶通過flow client cli提交任務,其實是向FATE-Flow server的9380端口發送http請求,在job_app中接受請求后,調用DAGScheduler.submit。
DAGScheduler提交Job
如圖6所示,DAGScheduler的submit方法生成Job id,進行Job相關配置,調用FederatedScheuler.create_job方法通知各方創建Job。這一步實際上是調用了federated_command 方法,通過grpc向各參與方發送rpc或http請求。initiator為每一方、每一個task初始化,并記錄在數據庫中(見t_tack表)。如果均未出錯,則將Job的狀態設置為WAITING。

圖6:發送http或rpc請求各方創建Job

圖7:initiator記錄Task信息
DAGScheduler調度waiting的Job
在2.1 節中我們提到過,DAGScheduler每2s一次調度Job。對于waiting狀態的Job,DAGScheduler首先檢查Job的狀態是否被取消,然后嘗試在各方申請資源,如果資源申請成功則調用start_job開始Job,向各個參與方發送開始Job的請求。各個參與方在收到請求后,將Job狀態改為RUNNING。

圖8:各參與方開始Job
DAGScheduler調度running的Job
對于running狀態的Job,實際調用的是TaskScheduler.schedule方法,在該方法中,獲取所有Task,并將Task的狀態同步。對于WAITTING狀態的Task,調用start_task方法開啟Task。initiator 向各方發送start task的rpc請求。

圖9:向各方發送start_task請求
TaskController執行Task
在收到start_task的rpc請求后,各方調用TaskController的start_task方法,對于eggroll引擎來說,實際上是啟動python子進程。

圖10:啟動python子進程
計算Task、Job狀態
在完成Task后會計算各參與方的狀態,如圖11所示,分為以下幾種情況:全部都是waiting狀態、存在interrupt狀態、存在running狀態、waiting和success狀態、全部都是end狀態。

圖11:狀態計算
四. 結語
如果說FederatedML是大腦,那么FATE-Flow就是骨架。FATE-Flow從整個任務生命周期的管理,到上層對外暴露API結構,在整個聯邦學習中起著舉足輕重的作用。無論是各個廠商在開發自家的隱私計算平臺,還是個人用戶使用命令行工具,其實都是在與FATE-Flow server打交道。在最新的1.9版本中,FATE-Flow也增加了新的功能:例如授權認證、負載均衡等。由于篇幅所限,本文僅從Job提交的角度來分析FATE-Flow的流程,感興趣的同學也可閱讀相關源碼。
內容編輯:創新研究院 高翔
責任編輯:創新研究院 陳佛忠
本公眾號原創文章僅代表作者觀點,不代表綠盟科技立場。所有原創內容版權均屬綠盟科技研究通訊。未經授權,嚴禁任何媒體以及微信公眾號復制、轉載、摘編或以其他方式使用,轉載須注明來自綠盟科技研究通訊并附上本文鏈接。