<menu id="guoca"></menu>
<nav id="guoca"></nav><xmp id="guoca">
  • <xmp id="guoca">
  • <nav id="guoca"><code id="guoca"></code></nav>
  • <nav id="guoca"><code id="guoca"></code></nav>

    【技術分享】OLLVM Deflattener

    VSole2021-07-29 16:14:02

    前言

    筆者于五月份時遇到幾個經控制流平坦化的樣本,由于之前沒有接觸過這方面知識,未深入分析。

    七月初看到一篇博客——MODeflattener – Miasm’s OLLVM Deflattener,作者同時在Github上公開其源碼,故筆者決定對其一探究竟。該工具使用miasm框架,Quarklab于2014年的博客——Deobfuscation: recovering an OLLVM-protected program中已提到此框架,但其采用符號執行以遍歷代碼,計算每個Relevant Block目標終點,恢復執行流程。騰訊SRC于2017年的博客——利用符號執行去除控制流平坦化中亦是采用符號執行(基于angr框架),而MODeflattener采用靜態分析方式來恢復執行流程。筆者于下文首先結合具體樣本分析其源碼及各函數功能,之后指出該工具不足之處并提出改進方案。關于涉及到的miasm框架中類,函數,屬性及方法可參閱miasm Documentation與miasm源碼。LLVM,OLLVM,LLVM Pass及控制流平坦化不再贅述,具體可參閱基于LLVM Pass實現控制流平坦化與OLLVM Flattening源碼。

    源碼分析

    0x01.1 檢測平坦化

    def calc_flattening_score(asm_graph):    score = 0.0    for head in asm_graph.heads_iter():        dominator_tree = asm_graph.compute_dominator_tree(head)        for block in asm_graph.blocks:            block_key = asm_graph.loc_db.get_offset_location(block.lines[0].offset)            dominated = set(                [block_key] + [b for b in dominator_tree.walk_depth_first_forward(block_key)])            if not any([b in dominated for b in asm_graph.predecessors(block_key)]):                continue            score = max(score, len(dominated)/len(asm_graph.nodes()))    return score
    

    計算每個塊支配塊數量,除以圖中所有塊,取最高分——若不低于0.9,證明該函數經過控制流平坦化。如下圖:

    塊b支配除塊a外其余所有塊(包含自身),那么其得分為9/10=0.9即最高分。讀者可自行查閱Dominator Tree及Predecessors相關數據結構知識,此外不作展開。

    0x01.2 獲取平坦化相關信息

    def get_cff_info(asmcfg):    preds = {}    for blk in asmcfg.blocks:        offset = asmcfg.loc_db.get_location_offset(blk.loc_key)        preds[offset] = asmcfg.predecessors(blk.loc_key)    # pre-dispatcher is the one with max predecessors    pre_dispatcher = sorted(preds, key=lambda key: len(preds[key]), reverse=True)[0]    # dispatcher is the one which suceeds pre-dispatcher    dispatcher = asmcfg.successors(asmcfg.loc_db.get_offset_location(pre_dispatcher))[0]    dispatcher = asmcfg.loc_db.get_location_offset(dispatcher)
        # relevant blocks are those which preceed the pre-dispatcher    relevant_blocks = []    for loc in preds[pre_dispatcher]:        offset = asmcfg.loc_db.get_location_offset(loc)        relevant_blocks.append(get_block_father(asmcfg, offset))
        return relevant_blocks, dispatcher, pre_dispatcher
    

    首先擁有最大數量Predecessor者即為Pre-Dispatcher,而Pre-Dispatcher后繼為Dispatcher,Pre-Dispatcher所有前趨均為Relevant Block。

    dispatcher_blk = main_asmcfg.getby_offset(dispatcher)    dispatcher_first_instr = dispatcher_blk.lines[0]    state_var = dispatcher_first_instr.get_args_expr()[1]
    

    Dispatcher塊第一條指令為狀態變量:

    0x01.3 去平坦化

    for addr in relevant_blocks:        _log.debug("Getting info for relevant block @ %#x"%addr)        loc_db = LocationDB()        mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db)        mdis.dis_block_callback = stop_on_jmp        asmcfg = mdis.dis_multiblock(addr)
            lifter = machine.lifter_model_call(loc_db)        ircfg = lifter.new_ircfg_from_asmcfg(asmcfg)        ircfg_simplifier = IRCFGSimplifierCommon(lifter)        ircfg_simplifier.simplify(ircfg, addr)
    

    遍歷每個Relevant Block,為其創建ASM CFG及IR CFG,作者實現了一save_cfg函數可將CFG輸出(需要安裝Graphviz):

    def save_cfg(cfg, name):    import subprocess    open(name, 'w').write(cfg.dot())    subprocess.call(["dot", "-Tpng", name, "-o", name.split('.')[0]+'.png'])    subprocess.call(["rm", name])
    

    之后查找所有使用狀態變量及定義狀態變量語句:

    nop_addrs = find_state_var_usedefs(ircfg, state_var)...def find_state_var_usedefs(ircfg, search_var):    var_addrs = set()    reachings = ReachingDefinitions(ircfg)    digraph = DiGraphDefUse(reachings)    # the state var always a leaf    for leaf in digraph.leaves():        if leaf.var == search_var:            for x in (digraph.reachable_parents(leaf)):                var_addrs.add(ircfg.get_block(x.label)[x.index].instr.offset)    return var_addrs
    

    可以看到DefUse圖與語句對應關系:

    查找狀態變量所有可能值:

    def find_var_asg(ircfg, var):    val_list = []    res = {}    for lbl, irblock in viewitems(ircfg.blocks):        for assignblk in irblock:            result = set(assignblk).intersection(var)            if not result:                continue            else:                dst, src = assignblk.items()[0]                if isinstance(src, ExprInt):                    res['next'] = int(src)                    val_list += [int(src)]                elif isinstance(src, ExprSlice):                    phi_vals = get_phi_vars(ircfg)                    res['true_next'] = phi_vals[0]                    res['false_next'] = phi_vals[1]                    val_list += phi_vals    return res, val_list
    

    為狀態變量賦值有兩種情況——一種是直接賦值,不涉及條件判斷及分支跳轉,其對應ExprInt:

    另一種是CMOV條件賦值,其對應ExprSlice:

    之后計算每一狀態變量可能值對應Relevant Block:

    # get the value for reaching a particular relevant block    for lbl, irblock in viewitems(main_ircfg.blocks):        for assignblk in irblock:            asg_items = assignblk.items()            if asg_items:    # do not enter if nop                dst, src = asg_items[0]                if isinstance(src, ExprOp):                    if src.op == 'FLAG_EQ_CMP':                        arg = src.args[1]                        if isinstance(arg, ExprInt):                            if int(arg) in val_list:                                cmp_val = int(arg)                                var, locs = irblock[-1].items()[0]                                true_dst = main_ircfg.loc_db.get_location_offset(locs.src1.loc_key)                                backbone[hex(cmp_val)] = hex(true_dst)
    

    如下圖:

    將fixed_cfg中存儲的狀態變量值替換為目標Relevant Block地址:

    for offset, link in fixed_cfg.items():        if 'cond' in link:            tval = fixed_cfg[offset]['true_next']            fval = fixed_cfg[offset]['false_next']            fixed_cfg[offset]['true_next'] = backbone[tval]            fixed_cfg[offset]['false_next'] = backbone[fval]        elif 'next' in link:            fixed_cfg[offset]['next'] = backbone[link['next']]        else:            # the tail doesn't has any condition            tail = int(offset, 16)
    

    將所有無關指令替換為NOP:

    for addr in rel_blk_info.keys():        _log.info('=> cleaning relevant block @ %#x' % addr)        asmcfg, nop_addrs = rel_blk_info[addr]        link = fixed_cfg[hex(addr)]        instrs = [instr for blk in asmcfg.blocks for instr in blk.lines]        last_instr = instrs[-1]        end_addr = last_instr.offset + last_instr.l        # calculate original length of block before patching        orig_len = end_addr - addr        # nop the jmp to pre-dispatcher        nop_addrs.add(last_instr.offset)        _log.debug('nop_addrs: ' + ', '.join([hex(addr) for addr in nop_addrs]))        patch = patch_gen(instrs, asmcfg.loc_db, nop_addrs, link)        patch = patch.ljust(orig_len, b"\x90")        patches[addr] = patch        _log.debug('patch generated %s' % encode_hex(patch))
        _log.info(">>> NOPing Backbone (%#x - %#x) <<<" % (backbone_start, backbone_end))    nop_len = backbone_end - backbone_start    patches[backbone_start] = b"\x90" * nop_len
    

    其中patch_gen函數定義如下:

    def patch_gen(instrs, loc_db, nop_addrs, link):    final_patch = b""    start_addr = instrs[0].offset
        for instr in instrs:        #omitting useless instructions        if instr.offset not in nop_addrs:            if instr.is_subcall():                #generate asm for fixed calls with relative addrs                patch_addr = start_addr + len(final_patch)                tgt = loc_db.get_location_offset(instr.args[0].loc_key)                _log.info("CALL %#x" % tgt)                call_patch_str = "CALL %s" % rel(tgt, patch_addr)                _log.debug("call patch : %s" % call_patch_str)                call_patch = asmb(call_patch_str, loc_db)                final_patch += call_patch                _log.debug("call patch asmb : %s" % encode_hex(call_patch))            else:                #add the original bytes                final_patch += instr.b
        patch_addr = start_addr + len(final_patch)    _log.debug("jmps patch_addr : %#x", patch_addr)    jmp_patches = b""    # cleaning the control flow by patching with real jmps locs    if 'cond' in link:        t_addr = int(link['true_next'], 16)        f_addr = int(link['false_next'], 16)        jcc = link['cond'].replace('CMOV', 'J')        _log.info("%s %#x" % (jcc, t_addr))        _log.info("JMP %#x" % f_addr)
            patch1_str = "%s %s" % (jcc, rel(t_addr, patch_addr))        jmp_patches += asmb(patch1_str, loc_db)        patch_addr += len(jmp_patches)
            patch2_str = "JMP %s" % (rel(f_addr, patch_addr))        jmp_patches += asmb(patch2_str, loc_db)        _log.debug("jmp patches : %s; %s" % (patch1_str, patch2_str))
        else:        n_addr = int(link['next'], 16)        _log.info("JMP %#x" % n_addr)
            patch_str = "JMP %s" % rel(n_addr, patch_addr)        jmp_patches = asmb(patch_str, loc_db)
            _log.debug("jmp patches : %s" % patch_str)
        _log.debug("jmp patches asmb : %s" % encode_hex(jmp_patches))    final_patch += jmp_patches
        return final_patch
    

    首先檢查每個塊中是否存在函數調用,如果存在,計算其修補后偏移;否則直接使用原指令:

    if instr.is_subcall():                #generate asm for fixed calls with relative addrs                patch_addr = start_addr + len(final_patch)                tgt = loc_db.get_location_offset(instr.args[0].loc_key)                _log.info("CALL %#x" % tgt)                call_patch_str = "CALL %s" % rel(tgt, patch_addr)                _log.debug("call patch : %s" % call_patch_str)                call_patch = asmb(call_patch_str, loc_db)                final_patch += call_patch                _log.debug("call patch asmb : %s" % encode_hex(call_patch))            else:                #add the original bytes                final_patch += instr.b
    

    之后修復執行流程:

    # cleaning the control flow by patching with real jmps locs    if 'cond' in link:        t_addr = int(link['true_next'], 16)        f_addr = int(link['false_next'], 16)        jcc = link['cond'].replace('CMOV', 'J')        _log.info("%s %#x" % (jcc, t_addr))        _log.info("JMP %#x" % f_addr)
            patch1_str = "%s %s" % (jcc, rel(t_addr, patch_addr))        jmp_patches += asmb(patch1_str, loc_db)        patch_addr += len(jmp_patches)
            patch2_str = "JMP %s" % (rel(f_addr, patch_addr))        jmp_patches += asmb(patch2_str, loc_db)        _log.debug("jmp patches : %s; %s" % (patch1_str, patch2_str))
        else:        n_addr = int(link['next'], 16)        _log.info("JMP %#x" % n_addr)
            patch_str = "JMP %s" % rel(n_addr, patch_addr)        jmp_patches = asmb(patch_str, loc_db)
    

    如果是條件賦值,替換為條件跳轉;否則直接替換為JMP指令。

    至此,源碼分析結束。如下圖所示,可以看到其效果很不錯:

    不足及改進

    0x02.1 不支持PE文件

    其計算基址是通過訪問sh屬性:

    section_ep = cont.bin_stream.bin.virt.parent.getsectionbyvad(cont.entry_point).sh    bin_base_addr = section_ep.addr - section_ep.offset    _log.info('bin_base_addr: %#x' % bin_base_addr)
    

    故筆者首先增加一參數baseaddr:

    parser.add_argument('-b',"--baseaddr", help="file base address")
    

    之后判斷文件類型:

       try:        if args.baseaddr:            _log.info('Base Address:'+args.baseaddr)            baseaddr=int(args.baseaddr,16)        elif cont.executable.isPE():            baseaddr=0x400C00            _log.info('Base Address:%x'%baseaddr)    except AttributeError:        section_ep = cont.bin_stream.bin.virt.parent.getsectionbyvad(cont.entry_point).sh        baseaddr = section_ep.addr - section_ep.offset        _log.info('Base Address:%x'%baseaddr)
    

    如果是PE文件且未提供baseaddr參數,則直接給其賦值為0x400C00;若是ELF文件,則訪問sh屬性。

    0x02.2 獲取CMOV指令

    elif len(var_asg) > 1:            #extracting the condition from the last 3rd line            cond_mnem = last_blk.lines[-3].name            _log.debug('cond used: %s' % cond_mnem)            var_asg['cond'] = cond_mnem            var_asg['true_next'] = hex(var_asg['true_next'])            var_asg['false_next'] = hex(var_asg['false_next'])            # map the conditions and possible values dictionary to the cfg info            fixed_cfg[hex(addr)] = var_asg
    

    通常CMOV指令位于倒數第三條語句,但筆者在測試時發現部分塊CMOV指令位于倒數第四條語句:

    故改進如下:

    elif len(var_asg) > 1:            #extracting the condition from the last 3rd line            cond_mnem = last_blk.lines[-3].name            _log.debug('cond used: %s' % cond_mnem)            if cond_mnem=='MOV':                cond_mnem = last_blk.lines[-4].name            var_asg['cond'] = cond_mnem            var_asg['true_next'] = hex(var_asg['true_next'])            var_asg['false_next'] = hex(var_asg['false_next'])            # map the conditions and possible values dictionary to the cfg info            fixed_cfg[hex(addr)] = var_asg
    

    0x02.3 調用導入函數

    if instr.is_subcall():                #generate asm for fixed calls with relative addrs                patch_addr = start_addr + len(final_patch)                tgt = loc_db.get_location_offset(instr.args[0].loc_key)                _log.info("CALL %#x" % tgt)                call_patch_str = "CALL %s" % rel(tgt, patch_addr)                _log.debug("call patch : %s" % call_patch_str)                call_patch = asmb(call_patch_str, loc_db)                final_patch += call_patch                _log.debug("call patch asmb : %s" % encode_hex(call_patch))
    

    僅判斷是否為函數調用,但并未判斷調用函數地址是否為導入函數地址,如下圖兩種情形所示:

    改進如下:

    if instr.is_subcall():                if isinstance(instr.args[0],ExprMem) or isinstance(instr.args[0],ExprId):                    final_patch += instr.b                else:                    #generate asm for fixed calls with relative addrs                    patch_addr = start_addr + len(final_patch)                    tgt = loc_db.get_location_offset(instr.args[0].loc_key)                    _log.info("CALL %#x" % tgt)                    call_patch_str = "CALL %s" % rel(tgt, patch_addr)                    _log.debug("call patch : %s" % call_patch_str)                    call_patch = asmb(call_patch_str, loc_db)                    final_patch += call_patch                    _log.debug("call patch asmb : %s" % encode_hex(call_patch))
    

    0x02.4 get_phi_vars

    def get_phi_vars(ircfg):    res = []    blks = list(ircfg.blocks)    irblock = (ircfg.blocks[blks[-1]])
        if irblock_has_phi(irblock):        for dst, sources in viewitems(irblock[0]):            phi_vars = sources.args            parent_blks = get_phi_sources_parent_block(                ircfg,                irblock.loc_key,                phi_vars            )
        for var, loc in parent_blks.items():        irblock = ircfg.get_block(list(loc)[0])        for asg in irblock:            dst, src = asg.items()[0]            if dst == var:                res += [int(src)]
        return res
    

    未考慮到由局部變量存儲狀態變量值情形:

    mov     eax, 0B3584423hmov     ecx, 0CAE581F5h...test    cl, 1mov     r9d, [rbp+18Ch]mov     r14d, [rbp+17Ch]cmovnz  r9d, r14dmov     [rbp+230h], r9djmp     loc_4044AE
    

    如果是上述情形,狀態變量值往往位于頭節點中,故改進如下:

    def get_phi_vars(ircfg,loc_db,mdis):    res = []    blks = list(ircfg.blocks)    irblock = (ircfg.blocks[blks[-1]])
        if irblock_has_phi(irblock):        for dst, sources in viewitems(irblock[0]):            phi_vars = sources.args            parent_blks = get_phi_sources_parent_block(                ircfg,                irblock.loc_key,                phi_vars            )
        for var, loc in parent_blks.items():        irblock = ircfg.get_block(list(loc)[0])        for asg in irblock:            dst, src = asg.items()[0]            if dst == var and isinstance(src,ExprInt):                res += [int(src)]            elif dst==var and isinstance(src,ExprOp):                reachings = ReachingDefinitions(ircfg)                digraph = DiGraphDefUse(reachings)                # the state var always a leaf                for head in digraph.heads():                    if head.var == src.args[0]:                        head_offset=loc_db.get_location_offset(head.label)                        if isinstance(mdis.dis_instr(head_offset).args[1],ExprInt):                            res+=[int(mdis.dis_instr(head_offset).args[1])]
        return res
    

    參閱鏈接

    • MODeflattener – Miasm’s OLLVM Deflattener
    • Deobfuscation: recovering an OLLVM-protected program—Quasklab
    • 利用符號執行去除控制流平坦化—騰訊SRC
    • Automated Detection of Control-flow Flattening
    • miasm—Github
    • OLLVM Flattening—Github
    • 基于LLVM Pass實現控制流平坦化—看雪
    nexthex
    本作品采用《CC 協議》,轉載必須注明作者和本文鏈接
    前言筆者于五月份時遇到幾個經控制流平坦化的樣本,由于之前沒有接觸過這方面知識,未深入分析。七月初看到一篇
    從某新生賽入門PWN
    2022-11-26 16:02:34
    本文為看雪論壇優秀文章看雪論壇作者ID:bad_c0de在某平臺上看到了質量不錯的新生賽,難度也比較適宜,因此嘗試通過該比賽進行入門,也將自己所學分享給大家。賽題ezcmp賽題分析該程序的C代碼如下,因此我們只要使buff和test的前三十個字節相同即可。因此可以直接在比較處下斷點查看buff數組的值即可。#includechar buff[100];int v0;char buffff[]="ABCDEFGHIJKLMNOPQRSTUVWXYZ1234";char bua[]="abcdefghijklmnopqrstuvwxyz4321";char* enccrypt{ int a; for{ a=rand(); buf[i]^=buffff[i]; buff[i]^=bua[i]; for{ buf[j]=buff[i]; buf[i]+='2'; } buf[i]-=&0xff; buf[i]+=&0xff; }}int main(){ setbuf; setbuf; setbuf; puts; char buf[]="Ayaka_nbbbbbbbbbbbbbbbbb_pluss"; strcpy; char test[30]; int v0=1; srand; enccrypt; read; if(!ezr0p64賽題分析查看保護同理可以通過rop進行繞過。
    現在的CTF比賽中很難在大型比賽中看到棧溢出類型的賽題,而即使遇到了也是多種利用方式組合出現,尤其以棧遷移配合其他利用方式來達到組合拳的效果,本篇文章意旨通過原理+例題的形式帶領讀者一步步理解棧遷移的原理以及在ctf中的應用。
    0RAYS-L3HCTF2021 writeup-web
    2021-11-18 16:22:06
    Easy PHP頁面看起來沒毛病 復制一下就發現有問題看響應包hex 再url編碼即可GET /?解法一 hash長度擴展攻擊原理hash長度擴展攻擊的基本場景:網上很多講的很難理解簡單來說 已知?做題根據上面的對應關系 使用工具./hash_extender?
    從fofa中搜索RDP,會看到它會解析出RDP的信息。本文探索如何自己實現一個。1. Nmap指紋在http
    MTCTF-2022 部分WriteUp
    2022-11-23 09:35:37
    MTCTF 本次比賽主力輸出選手Article&Messa&Oolongcode,累計解題3Web,2Pwn,1Re,1CryptoWeb★easypickle題目給出源碼:。import base64import picklefrom flask import Flask, sessionimport osimport random. @app.route('/')def hello_world(): if not session.get: session['user'] = ''.join return 'Hello {}!\x93作用同c,但是將從stack中出棧兩元素分別導入的模塊名和屬性名:此外對于藍帽杯WP還存在一個小問題,原題采用_loads函數加載pickle數據但本題是loads,在opcodes處理上會有些微不通具體來說就是用loads加載時會報錯誤如下:對著把傳入參數換成元組就行,最終的payload如下
    高版本go語言符號還原
    bang加固簡單分析
    2022-07-31 16:59:14
    dex加固,可以使用frida-dexdump可以直接dump下來。
    前言在Teaser Dragon CTF 2019中有2道考察Crypto方向的題目,一道Crypto題目,一道Web+Crypto題目,在這里對題目進行一下分析。
    跟php pwn一樣,以前遇到這樣的pwn直接都不看的,經過了解之后發現,老版本的Musl libc和新版本之間差距還比較大。結合最近幾次比賽中出現的Musl pwn,學習一下新老版本的Musl libc姿勢。
    VSole
    網絡安全專家
      亚洲 欧美 自拍 唯美 另类