【技術分享】OLLVM Deflattener

前言
筆者于五月份時遇到幾個經控制流平坦化的樣本,由于之前沒有接觸過這方面知識,未深入分析。

七月初看到一篇博客——MODeflattener – Miasm’s OLLVM Deflattener,作者同時在Github上公開其源碼,故筆者決定對其一探究竟。該工具使用miasm框架,Quarklab于2014年的博客——Deobfuscation: recovering an OLLVM-protected program中已提到此框架,但其采用符號執行以遍歷代碼,計算每個Relevant Block目標終點,恢復執行流程。騰訊SRC于2017年的博客——利用符號執行去除控制流平坦化中亦是采用符號執行(基于angr框架),而MODeflattener采用靜態分析方式來恢復執行流程。筆者于下文首先結合具體樣本分析其源碼及各函數功能,之后指出該工具不足之處并提出改進方案。關于涉及到的miasm框架中類,函數,屬性及方法可參閱miasm Documentation與miasm源碼。LLVM,OLLVM,LLVM Pass及控制流平坦化不再贅述,具體可參閱基于LLVM Pass實現控制流平坦化與OLLVM Flattening源碼。
源碼分析
0x01.1 檢測平坦化
def calc_flattening_score(asm_graph): score = 0.0 for head in asm_graph.heads_iter(): dominator_tree = asm_graph.compute_dominator_tree(head) for block in asm_graph.blocks: block_key = asm_graph.loc_db.get_offset_location(block.lines[0].offset) dominated = set( [block_key] + [b for b in dominator_tree.walk_depth_first_forward(block_key)]) if not any([b in dominated for b in asm_graph.predecessors(block_key)]): continue score = max(score, len(dominated)/len(asm_graph.nodes())) return score
計算每個塊支配塊數量,除以圖中所有塊,取最高分——若不低于0.9,證明該函數經過控制流平坦化。如下圖:

塊b支配除塊a外其余所有塊(包含自身),那么其得分為9/10=0.9即最高分。讀者可自行查閱Dominator Tree及Predecessors相關數據結構知識,此外不作展開。
0x01.2 獲取平坦化相關信息

def get_cff_info(asmcfg): preds = {} for blk in asmcfg.blocks: offset = asmcfg.loc_db.get_location_offset(blk.loc_key) preds[offset] = asmcfg.predecessors(blk.loc_key) # pre-dispatcher is the one with max predecessors pre_dispatcher = sorted(preds, key=lambda key: len(preds[key]), reverse=True)[0] # dispatcher is the one which suceeds pre-dispatcher dispatcher = asmcfg.successors(asmcfg.loc_db.get_offset_location(pre_dispatcher))[0] dispatcher = asmcfg.loc_db.get_location_offset(dispatcher)
# relevant blocks are those which preceed the pre-dispatcher relevant_blocks = [] for loc in preds[pre_dispatcher]: offset = asmcfg.loc_db.get_location_offset(loc) relevant_blocks.append(get_block_father(asmcfg, offset))
return relevant_blocks, dispatcher, pre_dispatcher
首先擁有最大數量Predecessor者即為Pre-Dispatcher,而Pre-Dispatcher后繼為Dispatcher,Pre-Dispatcher所有前趨均為Relevant Block。
dispatcher_blk = main_asmcfg.getby_offset(dispatcher) dispatcher_first_instr = dispatcher_blk.lines[0] state_var = dispatcher_first_instr.get_args_expr()[1]
Dispatcher塊第一條指令為狀態變量:

0x01.3 去平坦化
for addr in relevant_blocks: _log.debug("Getting info for relevant block @ %#x"%addr) loc_db = LocationDB() mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db) mdis.dis_block_callback = stop_on_jmp asmcfg = mdis.dis_multiblock(addr)
lifter = machine.lifter_model_call(loc_db) ircfg = lifter.new_ircfg_from_asmcfg(asmcfg) ircfg_simplifier = IRCFGSimplifierCommon(lifter) ircfg_simplifier.simplify(ircfg, addr)
遍歷每個Relevant Block,為其創建ASM CFG及IR CFG,作者實現了一save_cfg函數可將CFG輸出(需要安裝Graphviz):
def save_cfg(cfg, name): import subprocess open(name, 'w').write(cfg.dot()) subprocess.call(["dot", "-Tpng", name, "-o", name.split('.')[0]+'.png']) subprocess.call(["rm", name])
之后查找所有使用狀態變量及定義狀態變量語句:
nop_addrs = find_state_var_usedefs(ircfg, state_var)...def find_state_var_usedefs(ircfg, search_var): var_addrs = set() reachings = ReachingDefinitions(ircfg) digraph = DiGraphDefUse(reachings) # the state var always a leaf for leaf in digraph.leaves(): if leaf.var == search_var: for x in (digraph.reachable_parents(leaf)): var_addrs.add(ircfg.get_block(x.label)[x.index].instr.offset) return var_addrs

可以看到DefUse圖與語句對應關系:

查找狀態變量所有可能值:
def find_var_asg(ircfg, var): val_list = [] res = {} for lbl, irblock in viewitems(ircfg.blocks): for assignblk in irblock: result = set(assignblk).intersection(var) if not result: continue else: dst, src = assignblk.items()[0] if isinstance(src, ExprInt): res['next'] = int(src) val_list += [int(src)] elif isinstance(src, ExprSlice): phi_vals = get_phi_vars(ircfg) res['true_next'] = phi_vals[0] res['false_next'] = phi_vals[1] val_list += phi_vals return res, val_list
為狀態變量賦值有兩種情況——一種是直接賦值,不涉及條件判斷及分支跳轉,其對應ExprInt:


另一種是CMOV條件賦值,其對應ExprSlice:


之后計算每一狀態變量可能值對應Relevant Block:
# get the value for reaching a particular relevant block for lbl, irblock in viewitems(main_ircfg.blocks): for assignblk in irblock: asg_items = assignblk.items() if asg_items: # do not enter if nop dst, src = asg_items[0] if isinstance(src, ExprOp): if src.op == 'FLAG_EQ_CMP': arg = src.args[1] if isinstance(arg, ExprInt): if int(arg) in val_list: cmp_val = int(arg) var, locs = irblock[-1].items()[0] true_dst = main_ircfg.loc_db.get_location_offset(locs.src1.loc_key) backbone[hex(cmp_val)] = hex(true_dst)
如下圖:

將fixed_cfg中存儲的狀態變量值替換為目標Relevant Block地址:
for offset, link in fixed_cfg.items(): if 'cond' in link: tval = fixed_cfg[offset]['true_next'] fval = fixed_cfg[offset]['false_next'] fixed_cfg[offset]['true_next'] = backbone[tval] fixed_cfg[offset]['false_next'] = backbone[fval] elif 'next' in link: fixed_cfg[offset]['next'] = backbone[link['next']] else: # the tail doesn't has any condition tail = int(offset, 16)
將所有無關指令替換為NOP:
for addr in rel_blk_info.keys(): _log.info('=> cleaning relevant block @ %#x' % addr) asmcfg, nop_addrs = rel_blk_info[addr] link = fixed_cfg[hex(addr)] instrs = [instr for blk in asmcfg.blocks for instr in blk.lines] last_instr = instrs[-1] end_addr = last_instr.offset + last_instr.l # calculate original length of block before patching orig_len = end_addr - addr # nop the jmp to pre-dispatcher nop_addrs.add(last_instr.offset) _log.debug('nop_addrs: ' + ', '.join([hex(addr) for addr in nop_addrs])) patch = patch_gen(instrs, asmcfg.loc_db, nop_addrs, link) patch = patch.ljust(orig_len, b"\x90") patches[addr] = patch _log.debug('patch generated %s' % encode_hex(patch))
_log.info(">>> NOPing Backbone (%#x - %#x) <<<" % (backbone_start, backbone_end)) nop_len = backbone_end - backbone_start patches[backbone_start] = b"\x90" * nop_len
其中patch_gen函數定義如下:
def patch_gen(instrs, loc_db, nop_addrs, link): final_patch = b"" start_addr = instrs[0].offset
for instr in instrs: #omitting useless instructions if instr.offset not in nop_addrs: if instr.is_subcall(): #generate asm for fixed calls with relative addrs patch_addr = start_addr + len(final_patch) tgt = loc_db.get_location_offset(instr.args[0].loc_key) _log.info("CALL %#x" % tgt) call_patch_str = "CALL %s" % rel(tgt, patch_addr) _log.debug("call patch : %s" % call_patch_str) call_patch = asmb(call_patch_str, loc_db) final_patch += call_patch _log.debug("call patch asmb : %s" % encode_hex(call_patch)) else: #add the original bytes final_patch += instr.b
patch_addr = start_addr + len(final_patch) _log.debug("jmps patch_addr : %#x", patch_addr) jmp_patches = b"" # cleaning the control flow by patching with real jmps locs if 'cond' in link: t_addr = int(link['true_next'], 16) f_addr = int(link['false_next'], 16) jcc = link['cond'].replace('CMOV', 'J') _log.info("%s %#x" % (jcc, t_addr)) _log.info("JMP %#x" % f_addr)
patch1_str = "%s %s" % (jcc, rel(t_addr, patch_addr)) jmp_patches += asmb(patch1_str, loc_db) patch_addr += len(jmp_patches)
patch2_str = "JMP %s" % (rel(f_addr, patch_addr)) jmp_patches += asmb(patch2_str, loc_db) _log.debug("jmp patches : %s; %s" % (patch1_str, patch2_str))
else: n_addr = int(link['next'], 16) _log.info("JMP %#x" % n_addr)
patch_str = "JMP %s" % rel(n_addr, patch_addr) jmp_patches = asmb(patch_str, loc_db)
_log.debug("jmp patches : %s" % patch_str)
_log.debug("jmp patches asmb : %s" % encode_hex(jmp_patches)) final_patch += jmp_patches
return final_patch
首先檢查每個塊中是否存在函數調用,如果存在,計算其修補后偏移;否則直接使用原指令:
if instr.is_subcall(): #generate asm for fixed calls with relative addrs patch_addr = start_addr + len(final_patch) tgt = loc_db.get_location_offset(instr.args[0].loc_key) _log.info("CALL %#x" % tgt) call_patch_str = "CALL %s" % rel(tgt, patch_addr) _log.debug("call patch : %s" % call_patch_str) call_patch = asmb(call_patch_str, loc_db) final_patch += call_patch _log.debug("call patch asmb : %s" % encode_hex(call_patch)) else: #add the original bytes final_patch += instr.b
之后修復執行流程:
# cleaning the control flow by patching with real jmps locs if 'cond' in link: t_addr = int(link['true_next'], 16) f_addr = int(link['false_next'], 16) jcc = link['cond'].replace('CMOV', 'J') _log.info("%s %#x" % (jcc, t_addr)) _log.info("JMP %#x" % f_addr)
patch1_str = "%s %s" % (jcc, rel(t_addr, patch_addr)) jmp_patches += asmb(patch1_str, loc_db) patch_addr += len(jmp_patches)
patch2_str = "JMP %s" % (rel(f_addr, patch_addr)) jmp_patches += asmb(patch2_str, loc_db) _log.debug("jmp patches : %s; %s" % (patch1_str, patch2_str))
else: n_addr = int(link['next'], 16) _log.info("JMP %#x" % n_addr)
patch_str = "JMP %s" % rel(n_addr, patch_addr) jmp_patches = asmb(patch_str, loc_db)
如果是條件賦值,替換為條件跳轉;否則直接替換為JMP指令。
至此,源碼分析結束。如下圖所示,可以看到其效果很不錯:

不足及改進
0x02.1 不支持PE文件
其計算基址是通過訪問sh屬性:
section_ep = cont.bin_stream.bin.virt.parent.getsectionbyvad(cont.entry_point).sh bin_base_addr = section_ep.addr - section_ep.offset _log.info('bin_base_addr: %#x' % bin_base_addr)

故筆者首先增加一參數baseaddr:
parser.add_argument('-b',"--baseaddr", help="file base address")
之后判斷文件類型:
try: if args.baseaddr: _log.info('Base Address:'+args.baseaddr) baseaddr=int(args.baseaddr,16) elif cont.executable.isPE(): baseaddr=0x400C00 _log.info('Base Address:%x'%baseaddr) except AttributeError: section_ep = cont.bin_stream.bin.virt.parent.getsectionbyvad(cont.entry_point).sh baseaddr = section_ep.addr - section_ep.offset _log.info('Base Address:%x'%baseaddr)
如果是PE文件且未提供baseaddr參數,則直接給其賦值為0x400C00;若是ELF文件,則訪問sh屬性。
0x02.2 獲取CMOV指令
elif len(var_asg) > 1: #extracting the condition from the last 3rd line cond_mnem = last_blk.lines[-3].name _log.debug('cond used: %s' % cond_mnem) var_asg['cond'] = cond_mnem var_asg['true_next'] = hex(var_asg['true_next']) var_asg['false_next'] = hex(var_asg['false_next']) # map the conditions and possible values dictionary to the cfg info fixed_cfg[hex(addr)] = var_asg
通常CMOV指令位于倒數第三條語句,但筆者在測試時發現部分塊CMOV指令位于倒數第四條語句:

故改進如下:
elif len(var_asg) > 1: #extracting the condition from the last 3rd line cond_mnem = last_blk.lines[-3].name _log.debug('cond used: %s' % cond_mnem) if cond_mnem=='MOV': cond_mnem = last_blk.lines[-4].name var_asg['cond'] = cond_mnem var_asg['true_next'] = hex(var_asg['true_next']) var_asg['false_next'] = hex(var_asg['false_next']) # map the conditions and possible values dictionary to the cfg info fixed_cfg[hex(addr)] = var_asg
0x02.3 調用導入函數
if instr.is_subcall(): #generate asm for fixed calls with relative addrs patch_addr = start_addr + len(final_patch) tgt = loc_db.get_location_offset(instr.args[0].loc_key) _log.info("CALL %#x" % tgt) call_patch_str = "CALL %s" % rel(tgt, patch_addr) _log.debug("call patch : %s" % call_patch_str) call_patch = asmb(call_patch_str, loc_db) final_patch += call_patch _log.debug("call patch asmb : %s" % encode_hex(call_patch))
僅判斷是否為函數調用,但并未判斷調用函數地址是否為導入函數地址,如下圖兩種情形所示:


改進如下:
if instr.is_subcall(): if isinstance(instr.args[0],ExprMem) or isinstance(instr.args[0],ExprId): final_patch += instr.b else: #generate asm for fixed calls with relative addrs patch_addr = start_addr + len(final_patch) tgt = loc_db.get_location_offset(instr.args[0].loc_key) _log.info("CALL %#x" % tgt) call_patch_str = "CALL %s" % rel(tgt, patch_addr) _log.debug("call patch : %s" % call_patch_str) call_patch = asmb(call_patch_str, loc_db) final_patch += call_patch _log.debug("call patch asmb : %s" % encode_hex(call_patch))
0x02.4 get_phi_vars
def get_phi_vars(ircfg): res = [] blks = list(ircfg.blocks) irblock = (ircfg.blocks[blks[-1]])
if irblock_has_phi(irblock): for dst, sources in viewitems(irblock[0]): phi_vars = sources.args parent_blks = get_phi_sources_parent_block( ircfg, irblock.loc_key, phi_vars )
for var, loc in parent_blks.items(): irblock = ircfg.get_block(list(loc)[0]) for asg in irblock: dst, src = asg.items()[0] if dst == var: res += [int(src)]
return res
未考慮到由局部變量存儲狀態變量值情形:
mov eax, 0B3584423hmov ecx, 0CAE581F5h...test cl, 1mov r9d, [rbp+18Ch]mov r14d, [rbp+17Ch]cmovnz r9d, r14dmov [rbp+230h], r9djmp loc_4044AE
如果是上述情形,狀態變量值往往位于頭節點中,故改進如下:
def get_phi_vars(ircfg,loc_db,mdis): res = [] blks = list(ircfg.blocks) irblock = (ircfg.blocks[blks[-1]])
if irblock_has_phi(irblock): for dst, sources in viewitems(irblock[0]): phi_vars = sources.args parent_blks = get_phi_sources_parent_block( ircfg, irblock.loc_key, phi_vars )
for var, loc in parent_blks.items(): irblock = ircfg.get_block(list(loc)[0]) for asg in irblock: dst, src = asg.items()[0] if dst == var and isinstance(src,ExprInt): res += [int(src)] elif dst==var and isinstance(src,ExprOp): reachings = ReachingDefinitions(ircfg) digraph = DiGraphDefUse(reachings) # the state var always a leaf for head in digraph.heads(): if head.var == src.args[0]: head_offset=loc_db.get_location_offset(head.label) if isinstance(mdis.dis_instr(head_offset).args[1],ExprInt): res+=[int(mdis.dis_instr(head_offset).args[1])]
return res
參閱鏈接
- MODeflattener – Miasm’s OLLVM Deflattener
- Deobfuscation: recovering an OLLVM-protected program—Quasklab
- 利用符號執行去除控制流平坦化—騰訊SRC
- Automated Detection of Control-flow Flattening
- miasm—Github
- OLLVM Flattening—Github
- 基于LLVM Pass實現控制流平坦化—看雪