【白帽故事】打造一款多線程目錄爆破掃描器
VSole2022-03-04 12:35:33
聲明:文章中涉及的程序(方法)可能帶有攻擊性,僅供安全研究與教學之用,讀者將其信息做其他用途,由用戶承擔全部法律及連帶責任,文章作者不承擔任何法律及連帶責任。 |
背景介紹:
今天的分享來自國外一位ID為Mayank Pandey的白帽子,他分享了使用Python多線程功能打造的一款多線程網站目錄爆破掃描器。廢話不多說,上代碼(代碼很少,100行不到):
from threading import Threadimport time,requests,sys,os.path
def usage(): print("----------USAGE INSTRUCTION ---------") print(f"{sys.argv[0]} URL WORDLIST NUMBER_OF_THREADS(Default is 10)") sys.exit()
def prepare(myList,numOfChunks): for i in range(0, len(myList), numOfChunks): yield myList[i:i + numOfChunks]
def brute(myList,url): start=time.perf_counter() for lists in myList: threads.append(Thread(target=worker,args=(lists,url),daemon=True)) for thread in threads: try: thread.start() except KeyboardInterrupt: print("Received Keyboard Interrupt , Terminating threads") sys.exit() for thread in threads: try: thread.join() except KeyboardInterrupt: print("Received Keyboard Interrupt , Terminating threads") sys.exit() finish=time.perf_counter() print(f"\t\t Checked {total_len} Directories in {round(finish-start,2)} Seconds")
def worker(lists,url): try: for word in lists: if word.startswith("/"): word=word[1:] url2=url+"/"+word.strip() r=requests.get(url2) if str(r.status_code) in match: print(f"/{word.strip():<40} [ Status: {r.status_code} Length:{len(r.content)} ]") except KeyboardInterrupt: print("Received Keyboard Interrupt , Terminating threads") sys.exit() except Exception as e: print(f"An error Occurred : {e}") sys.exit()
if __name__ == "__main__": try: match=['200','301','302','401','403','429'] #change this to filter responses try: if sys.argv[1]: url=sys.argv[1] if sys.argv[2]: wordlist=sys.argv[2] try: if sys.argv[3]: numOfThreads=int(sys.argv[3]) except: numOfThreads=10 except: usage() if os.path.isfile(wordlist)==False: print(f"The file {wordlist} doesn't exist") sys.exit() with open(wordlist,'r') as w: myList=w.readlines() total_len=len(myList) final=[] threads=[] if numOfThreads>total_len or numOfThreads<0: print("Too High Value for Threads with Respect to Input Word-list") sys.exit(1) numOfChunks=len(myList)//numOfThreads if url.endswith("/"): url=url[0:-1] print(f''' ====================================== URL --> {url} Word-list --> {wordlist} Threads --> {numOfThreads} Status Codes --> {','.join([w for w in match])} ====================================== ''') print("------- Started Brute forcing Directories -------") myList_new=prepare(myList,numOfChunks) brute(myList_new,url) except Exception as e: print(f"An error Occurred : {e}") sys.exit()
代碼詳解:
導入和使用詳細信息:
from threading import Threadimport time,requests,sys,os.path def usage(): print("----------USAGE INSTRUCTION ---------") print(f"{sys.argv[0]} URL WORDLIST NUMBER_OF_THREADS") sys.exit()
這一段代碼主要是導入線程和相關模塊,Usage()函數向用戶展示了如何使用程序,它在命令行參數不足時會被觸發。
分塊處理:
def prepare(myList,numOfChunks): for i in range(0, len(myList), numOfChunks): yield myList[i:i + numOfChunks]
該函數是線程數發揮作用的主要地方,利用它對主列表進行分塊處理。
多線程邏輯:
def brute(myList,url): start=time.perf_counter() for lists in myList: threads.append(Thread(target=worker,args=(lists,url),daemon=True)) for thread in threads: try: thread.start() except KeyboardInterrupt: print("Received Keyboard Interrupt , Terminating threads") sys.exit() for thread in threads: try: thread.join() except KeyboardInterrupt: print("Received Keyboard Interrupt , Terminating threads") sys.exit() finish=time.perf_counter() print(f"\t\t Checked {total_len} Directories in {round(finish-start,2)} Seconds")
def worker(lists,url): try: for word in lists: if word.startswith("/"): word=word[1:] url2=url+"/"+word.strip() r=requests.get(url2) if str(r.status_code) in match: print(f"/{word.strip():<40} [ Status: {r.status_code} Length:{len(r.content)} ]") except KeyboardInterrupt: print("Received Keyboard Interrupt , Terminating threads") sys.exit() except Exception as e: print(f"An error Occurred : {e}") sys.exit()
上面定義了兩個函數brute()和worker(),線程會從brute()函數開始,worker()函數將負責處理請求。
threads.append(Thread(target=worker,args=(lists,url),daemon=True))
從分塊列表中提取列表,并使用它啟動Thread。
線程將以Worker函數作為目標,這些函數將被附加到包含所有線程的列表中,在此之后,將啟動并關聯所有線程。
for thread in threads: thread.start() for thread in threads: thread.join()
thread.start():
創建線程實例時,它并不會立即執行,所以需要調用它的start()方法。一旦啟動,線程將獨立運行,直到目標函數返回。
thread.join():
在調用join()方法時,調用線程會被阻塞,直到終止線程對象(在其上調用線程),線程對象會在以下任何一種情況下終止:
- 正常情況
- 處理一個不當的異常時
- 發生超時
這有助于所有線程完成其工作并正常退出。
在Worker方法關閉(所有線程都關閉)之后,控制將返回到brute()函數,然后繼續接下來正常的程序流程。
大概就是這樣,使用Python的多線程創建一個簡單的掃描工具,但是請記住不要使用太多的線程來運行你的程序,因為一旦處理不當,可能會導致其它程序的崩潰。
當然這個掃描器的代碼目前已經放在Github上
VSole
網絡安全專家