【技術分享】源碼解析angr的模擬執行
前言
angr是很有名的二進制符號執行工具,網上有許多關于angr的源碼解析的文章。但是好像還沒有關于angr模擬執行模塊的解析。而模擬執行部分也是angr中相當重要的一個部分。因此,本文將解析angr模擬執行部分的源碼,來幫助大家了解angr模擬執行的基本原理。
概述
當我們用angr去符號執行的時候,最基本的幾個操作如下面代碼所示:導入代碼(第1行)、導入二進制(第2行)、確定初始狀態(第3行)、構建simulation_manager對象(第4行)、模擬執行(第5行)。而到底angr是怎么符號執行的呢?因此就需要深入simulation_manager的源碼(sim_manager.py)去一探究竟了。
import angrp = angr.Project("xxxx")entry_state = p.factory.entry_state()simgr = p.factory.simgr(entry_state)#simgr是simulation_manager的別名simgr.explore(find=xxxx)
simulation_manager這個類位于angr/sim_manager.py文件里。
simulation_manager是angr中模擬執行管理器。主要的操作對象是程序的狀態對象(sim_state)。狀態都被放在stash里,可以往前執行、過濾、合并或者移到別的stash里。stash里可以理解為是放狀態的一個列表,stash有這么幾種,分別表示狀態的狀態:
(1) active:保存接下來要執行的狀態
(2) deadended:由于某些原因不能再繼續執行下去,比如沒有合法的指令、下個節點的狀態不可解,或者有一個非法的指令指針。
(3) pruned:當使用lazy_sovles的策略時,只有在必要的時候才去檢查狀態是否可解。當發現一個不可求解的節點后,將其后面的節點都優化掉,放在pruned里。
(4) unconstrained:比如PC被用戶數據或者其他類型的符號變量所控制,導致不知道執行哪個指令。
(5) unsat:不可求解的狀態。比如,輸入同時為AAAA和BBBB。
接下來看看源碼,源碼中提示我們看simulation_manager的三個重要方法:step、explore、use_technique。
use_technique
angr里有自帶很多啟發式的路徑探索方法。這個函數就是讓simulation_manager能夠調用外部寫好的啟發式路徑搜索方法。官方給出的幾個樣例里,除了經典的深度優先搜索、也有檢測內存使用情況、CMU論文里的Veritest(合并循環的狀態)等等策略。
代碼首先先判斷tech是否屬于ExplorationTechnique這個類。然后setup方法開始初始化。然后把tech防到techniques列表中去,這也意味著可以使用多種策略。這里的hookset暫時沒有看懂。
def use_technique(self, tech): """ Use an exploration technique with this SimulationManager. Techniques can be found in :mod:`angr.exploration_techniques`. :param tech: An ExplorationTechnique object that contains code to modify this SimulationManager's behavior. :type tech: ExplorationTechnique :return: The technique that was added, for convenience """ if not isinstance(tech, ExplorationTechnique): raise SimulationManagerError
# XXX: as promised tech.project = self._project tech.setup(self)
HookSet.install_hooks(self, **tech._get_hooks()) self._techniques.append(tech) return tech
explore
先來看看看explore函數的參數,有stash,n,find,avoid等參數。explore函數的功能是從某個類型的stash,比如active,開始尋找滿足find條件的,需要避免avoid條件的狀態,直到找了n次,或者找到了num_find個狀態。然后找到的狀態都會塞到find_stash里,篩選的狀態都會放在avoid_stash里。
其中find和avoid參數可以是一個地址,或者一堆地址的集合或者列表,甚至可以是一個函數,以狀態為輸入,輸出True 或者False,來表示該狀態是否是要尋找的狀態。如果angr的CFG作為cfg的參數并且find是一個地址或者一個列表或者集合,那么到達不了目標狀態的狀態就會先把提前篩選掉。
def explore(self, stash='active', n=None, find=None, avoid=None, find_stash='found', avoid_stash='avoid', cfg=None, num_find=1, **kwargs): """ Tick stash "stash" forward (up to "n" times or until "num_find" states are found), looking for condition "find", avoiding condition "avoid". Stores found states into "find_stash' and avoided states into "avoid_stash". The "find" and "avoid" parameters may be any of: - An address to find - A set or list of addresses to find - A function that takes a state and returns whether or not it matches. If an angr CFG is passed in as the "cfg" parameter and "find" is either a number or a list or a set, then any states which cannot possibly reach a success state without going through a failure state will be preemptively avoided. """ num_find += len(self._stashes[find_stash]) if find_stash in self._stashes else 0 tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))
# Modify first Veritesting so that they can work together. deviation_filter_saved = None for t in self._techniques: if isinstance(t,Veritesting): deviation_filter_saved = t.options.get("deviation_filter",None) if deviation_filter_saved is not None: t.options["deviation_filter"] = lambda s: tech.find(s) or tech.avoid(s) or deviation_filter_saved(s) else: t.options["deviation_filter"] = lambda s: tech.find(s) or tech.avoid(s) break
try: self.run(stash=stash, n=n, **kwargs) finally: self.remove_technique(tech)
for t in self._techniques: if isinstance(t,Veritesting): if deviation_filter_saved is None: del t.options["deviation_filter"] else: t.options["deviation_filter"] = deviation_filter_saved break
return self
宏觀來看explore函數分為三部分:初始化,兼容veritest策略,探索(run)。兼容veritest策略的代碼占了很多,對于理解veritest策略與其他策略的關系很有幫助,但是對我們理解符號執行幫助較小,這里就不贅述了。
首先,更新num_find的參數為設定的num_find參數加上找到的狀態。接著,用傳入的參數find,avoid等生成Explorer對象,然后再用use_technique方法生成一個tech對象。這里為什么要生成Explore對象,然后再用use_technique方法?
Explorer對象繼承了ExplorationTechnique類,所以他也是一種探索策略,并且是一種最基礎的策略。
而符號執行過程中,可以使用多種策略,那么如何綜合這些策略呢?angr是把他們都放在了simulationmanager里的.techniques列表里,而use_technique方法的作用正是把策略對象放進這個techniques列表里。
num_find += len(self._stashes[find_stash]) if find_stash in self._stashes else 0 tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))
初始化后,接下來就是去探索狀態部分。簡單的一個try,finally語句。不論run的結果如何,最后都把基礎探索策略移出_techniques列表里。
try: self.run(stash=stash, n=n, **kwargs) finally: self.remove_technique(tech)
run函數的代碼如下,思路很簡單,根據當前的探索策略,一直探索,直到到達一個完整的狀態。如果策略里沒定義完整的策略,那就把stash里的狀態都跑完。run里涉及到了后面會講的step函數,這里可以先簡單理解為單步符號執行。
def run(self, stash='active', n=None, until=None, **kwargs): """ Run until the SimulationManager has reached a completed state, according to the current exploration techniques. If no exploration techniques that define a completion state are being used, run until there is nothing left to run. :param stash: Operate on this stash :param n: Step at most this many times :param until: If provided, should be a function that takes a SimulationManager and returns True or False. Stepping will terminate when it is True. :return: The simulation manager, for chaining. :rtype: SimulationManager """ for _ in (itertools.count() if n is None else range(0, n)): if not self.complete() and self._stashes[stash]: self.step(stash=stash, **kwargs) if not (until and until(self)): continue break return self
step
最后就是這個比較復雜的step函數了,可以看作是符號執行的基本單元了。相比explore函數的參數多了selector_func,step_func,successor_func,filter_func,until。這些參數的意思代碼注釋寫得比較清楚了,就簡單翻譯一下。這些參數都是一個以狀態為輸入,返回各種東西(比如bool值,后繼節點等)的一個函數,類似下面的代碼。
def fun(state): if state.addr == xxxx: return True else: return False
- selector_func:如果為True,將會繼續步進,反之會被保留。
- successor_func:返回的是后繼節點,后面將會使用這些后繼節點去符號執行。反之,則是使用project.factory.successors的后繼節點。
- filter_func:返回的是stash的名字。filter_func的主要作用是給狀態分類,分到各個stash里去。
- step_func:與前面參數不同,輸入是為simulation_manger對象,并返回simulation_manager對象。這個函數會在simulation_manager對象每次step的時候被調用。
def step(self, stash='active', n=None, selector_func=None, step_func=None, successor_func=None, until=None, filter_func=None, **run_args): """ Step a stash of states forward and categorize the successors appropriately. The parameters to this function allow you to control everything about the stepping and categorization process. :param stash: The name of the stash to step (default: 'active') :param selector_func: If provided, should be a function that takes a state and returns a boolean. If True, the state will be stepped. Otherwise, it will be kept as-is. :param step_func: If provided, should be a function that takes a SimulationManager and returns a SimulationManager. Will be called with the SimulationManager at every step. Note that this function should not actually perform any stepping - it is meant to be a maintenance function called after each step. :param successor_func: If provided, should be a function that takes a state and return its successors. Otherwise, project.factory.successors will be used. :param filter_func: If provided, should be a function that takes a state and return the name of the stash, to which the state should be moved. :param until: (DEPRECATED) If provided, should be a function that takes a SimulationManager and returns True or False. Stepping will terminate when it is True. :param n: (DEPRECATED) The number of times to step (default: 1 if "until" is not provided) Additionally, you can pass in any of the following keyword args for project.factory.successors: :param jumpkind: The jumpkind of the previous exit :param addr: An address to execute at instead of the state's ip. :param stmt_whitelist: A list of stmt indexes to which to confine execution. :param last_stmt: A statement index at which to stop execution. :param thumb: Whether the block should be lifted in ARM's THUMB mode. :param backup_state: A state to read bytes from instead of using project memory. :param opt_level: The VEX optimization level to use. :param insn_bytes: A string of bytes to use for the block instead of the project. :param size: The maximum size of the block, in bytes. :param num_inst: The maximum number of instructions. :param traceflags: traceflags to be passed to VEX. Default: 0 :returns: The simulation manager, for chaining. :rtype: SimulationManager """ l.info("Stepping %s of %s", stash, self) # 8<----------------- Compatibility layer ----------------- if n is not None or until is not None: if once('simgr_step_n_until'): print("\x1b[31;1mDeprecation warning: the use of `n` and `until` arguments is deprecated. " "Consider using simgr.run() with the same arguments if you want to specify " "a number of steps or an additional condition on when to stop the execution.\x1b[0m") return self.run(stash, n, until, selector_func=selector_func, step_func=step_func, successor_func=successor_func, filter_func=filter_func, **run_args) # ------------------ Compatibility layer ---------------->8 bucket = defaultdict(list)
for state in self._fetch_states(stash=stash):
goto = self.filter(state, filter_func=filter_func) if isinstance(goto, tuple): goto, state = goto
if goto not in (None, stash): bucket[goto].append(state) continue
if not self.selector(state, selector_func=selector_func): bucket[stash].append(state) continue
pre_errored = len(self._errored)
successors = self.step_state(state, successor_func=successor_func, **run_args) # handle degenerate stepping cases here. desired behavior: # if a step produced only unsat states, always add them to the unsat stash since this usually indicates a bug # if a step produced sat states and save_unsat is False, drop the unsats # if a step produced no successors, period, add the original state to deadended
# first check if anything happened besides unsat. that gates all this behavior if not any(v for k, v in successors.items() if k != 'unsat') and len(self._errored) == pre_errored: # then check if there were some unsats if successors.get('unsat', []): # only unsats. current setup is acceptable. pass else: # no unsats. we've deadended. bucket['deadended'].append(state) continue else: # there were sat states. it's okay to drop the unsat ones if the user said so. if not self._save_unsat: successors.pop('unsat', None)
for to_stash, successor_states in successors.items(): bucket[to_stash or stash].extend(successor_states)
self._clear_states(stash=stash) for to_stash, states in bucket.items(): self._store_states(to_stash or stash, states)
if step_func is not None: return step_func(self) return self
首先,從stash里取出一個狀態,調用filter函數看下該狀態最后要去哪個stash里,如果不是當前的stash,則把該狀態塞到應該放的stash的地方,然后取下一個狀態。調用selector函數,選擇要保留的狀態。
bucket = defaultdict(list)# 依次從stash里取出狀態for state in self._fetch_states(stash=stash): goto = self.filter(state, filter_func=filter_func) # 返回的是個元組,(狀態該去的stash,狀態) if isinstance(goto, tuple): goto, state = goto #如果要去的stash不是當前的stash,也不是None, if goto not in (None, stash): # 那么把他放進該去的stash里,就不管他了。也就篩選掉了。 bucket[goto].append(state) # continue # 如果selector函數返回False,則需要保留該狀態到當前的stash if not self.selector(state, selector_func=selector_func): # 保留狀態 bucket[stash].append(state) continue
如果沒有觸發selector或者filter,就去找后繼節點。這里調用了step_state函數。
for state in self._fetch_states(stash=stash): ... successors = self.step_state(state, successor_func=successor_func, **run_args)
step_state函數如下所示,這個函數主要是處理后繼節點的狀態。將不可解的狀態,無約束的狀態都放在相應的stash里。
def step_state(self, state, successor_func=None, **run_args): """ Don't use this function manually - it is meant to interface with exploration techniques. """ try: successors = self.successors(state, successor_func=successor_func, **run_args) stashes = {None: successors.flat_successors, 'unsat': successors.unsat_successors, 'unconstrained': successors.unconstrained_successors} except: ... return stashes
由于step_state函數可能會發生很多錯誤,因此后續的代碼是去做后繼節點錯誤狀態的處理。
for state in self._fetch_states(stash=stash): ... #如果有后繼節點有任何一個unsat狀態或者發生了新的錯誤 if not any(v for k, v in successors.items() if k != 'unsat') and len(self._errored) == pre_errored: #對于unsat狀態,就先不管他 if successors.get('unsat', []): # only unsats. current setup is acceptable. pass else: #如果不是unsat,那說明遇到某些原因終止了,把該狀態加到deadended的stash里去。 bucket['deadended'].append(state) continue else: # 如果沒有設置保留unsat狀態,就把后繼節點的unsat狀態丟出去。 if not self._save_unsat: successors.pop('unsat', None)
接下來就把后繼節點加到bucket的to_stash或者stash里去。自此,這個for循環就結束了。
for state in self._fetch_states(stash=stash): ... for to_stash, successor_states in successors.items(): bucket[to_stash or stash].extend(successor_states)
剩下就是一些收尾工作,清空當前stash里的狀態,然后再把bucket的內容存到simulation_manager對象的stash里去。
self._clear_states(stash=stash)for to_stash, states in bucket.items(): self._store_states(to_stash or stash, states)
如果有設置step_func,就去調用step_func。由此也能看到step_func是在step函數最后調用的。
if step_func is not None: return step_func(self)
總結
angr模擬執行部分的主要代碼就解析到這里了,希望大家能夠對angr的模擬執行有更深的理解。在理解了angr這部分的內容之后,應該能夠比較容易地擴展angr的探索策略。
本文只涉及了sim_manager.py中的幾個重要地方,如果要熟練使用simulation_manager的各種功能的話,比如split,merge等等,還需要再看看源碼。
參考資料:
https://github.com/angr/angr/tree/master/angr