host碰撞python3腳本實現
VSole2022-01-14 15:26:38
引言
host碰撞,懂的都懂。這里主要講下腳本實現邏輯。
host碰撞邏輯
ip與host組合,進行嘗試,修改請求頭中的host。本腳本主要是多線程先進行DNS解析檢測,排除外網域名,提高掃描準確度,再進行host碰撞掃描。
腳本實現邏輯
1、ip_file加載ip,host_file加載域名(一行一個)
2、實現函數:
domainCheckThreadMain
邏輯:首先會對host_file里的域名進行dns解析檢測,如果能直接解析的,這種域名就不屬于內網域名。
可以直接排除掉(這里有個困惑點,通過一些工具收集資產時,會發現解析地址為內網地址,所以會檢測解析的地址是否為內網地址,如果是的話,那么依然進行碰撞檢測),過濾后的域名結果會保存在self.new_host_save_file
3、實現函數:hostScanThreadMain
邏輯:對ip(會拼接http、https組合成url)、host進行組合,存入到隊列中,方便多線程取數據,只保存狀態碼400以下后的(這里也可以直接修改成狀態碼為200的),因為感覺不夠了解,不知道會不會有什么其他特殊的情況,所以這里沒有去直接匹配200狀態碼。
4、最后的結果會保存為xlsx表,因為xlsx表方便去篩選。結果說明,目前發現比如nginx默認頁面,可能host怎么改,狀態碼都是200,所以對于結果的篩選主要是通過響應包長度、title快速排查。
示例代碼需要安裝的庫
pip install requestspip install openpyxlpip install dnspython
貼實現代碼:
import osimport sysimport time
import requestsimport threadingimport queueimport randomimport reimport dns.resolverimport openpyxl
class HostScan(object):
def __init__(self,ip_file,host_file,userAgent_file,thread_num,proxies=None,isDomainCheck=True): '''
:param ip_file: :param host_file: :param userAgent_file: :param thread_num: :param proxies: :param isDomainCheck: 是否先進行域名解析檢測,如果在外網能解析的對象,則可以直接過濾 ''' self.ip_list = self.load_ip(ip_file) self.host_list = self.load_host(host_file) self.userAgent_list = self.load_userAgent(userAgent_file) self.domain_queue = queue.Queue(maxsize=0) self.thread_num = thread_num self.proxies = proxies self.dns_resolver = dns.resolver.Resolver() self.dns_resolver.nameservers = ["114.114.114.114", "8.8.8.8"] self.search_title_re = re.compile("([\s\S]*?)") self.floder_path = os.path.dirname(os.path.abspath(sys.argv[0])) self.result_save_file = os.path.join(self.floder_path,'hostScanResult.txt') self.result_list = [] self.result_xlsx_file = os.path.join(self.floder_path,'hostScanResult.xlsx') self.new_host_save_file = os.path.join(self.floder_path, 'new_host.txt') self.isDomainCheck = isDomainCheck
def load_ip(self,file): data_set = set() with open(file, "r", encoding="utf-8") as f: for row in f: row = row.strip() if row: row = row.split()[0] data_set.add(row) return list(data_set)
def load_host(self,file): data_set = set() with open(file, "r", encoding="utf-8") as f: for row in f: row = row.strip() if row: row = row.split()[0] data_set.add(row) return list(data_set)
def check_inside_ip(self,ip): ''' 判斷是否為內網ip :param ip: 要檢測的ip :return: True 則為內網ip,False則為外網ip ''' inside_ip_segment = ["192.168.0.0/16", "172.16.0.0/12", "10.0.0.0/8"] for ip_segment in inside_ip_segment: if self.check_ip_by_network_segment(ip, ip_segment): return True if ip in ["127.0.0.1", "0.0.0.0"]: return True return False
def check_ip_by_network_segment(self,ip, ip_segment): ''' 判斷ip與ip_segment是否為同網段 判斷思路,將ip的4個部分,每個部分都變成2進制形式,如果不滿8位,則在后面補0 將ip網段同理分成2進制 然后比較前掩碼個數,比如掩碼為25,那么就比較前25位是否相同,如果相同,則說明同網段 :param ip: 172.16.1.1 :param ip_segment: 172.16.0.0/24 :return: 同網段,返回True,不同段網返回False ''' ip_list = ip.split(".") ip_two = "" for ip_num in ip_list: ip_2 = bin(int(ip_num)).replace("0b", "") ip_2 = "0" * (8 - len(ip_2)) + ip_2 ip_two += ip_2 ip_segment_net, ip_segment_ym = ip_segment.split("/") ip_segment_ym = int(ip_segment_ym) ip_segment_net_list = ip_segment_net.split(".") ip_segment_net_2 = "" for ip_num in ip_segment_net_list: ip_2 = bin(int(ip_num)).replace("0b", "") ip_2 = "0" * (8 - len(ip_2)) + ip_2 ip_segment_net_2 += ip_2 if ip_segment_net_2[:ip_segment_ym] == ip_two[:ip_segment_ym]: return True else: return False
def load_userAgent(self,file): userAgent_set = set() with open(file, "r", encoding="utf-8") as f: for row in f: row = row.strip() if row: userAgent_set.add(row) return list(userAgent_set)
def create_ip_host_queue(self,ip_list,host_list): ''' 生成ip與queue的組合 :return: ''' q = queue.Queue(maxsize=0) for ip in ip_list: for host in host_list: q.put(("http://"+ip,host),timeout=1) q.put(("https://" + ip, host), timeout=1) return q
def result_save(self,msg): with open(self.result_save_file,"a+",encoding="utf-8") as f: f.write(msg+"")
def checkDomainStatus(self,domain): ''' 檢測域名是否可在外網解析,需要排除解析成內網域名的情況 可解析 返回True 不可解析返回False 解析的IP只要有一個IP是內網域名,就認為不可解析 返回False :param domain: :return: ''' try: result = self.dns_resolver.query(domain, "A") for i in result.response.answer: if i.rdtype == 1: for j in i.items: if self.check_inside_ip(j.to_text()): # 內網域名 return False return True except Exception as e: # print(e.__str__()) return False
def hostScanRun(self): ''' 實際掃描函數 :return: ''' while not self.data_queue.empty(): try: url,host = self.data_queue.get(timeout=1) except Exception as e: return None headers = { "user-agent": random.choice(self.userAgent_list), "host": host, } try: resp = requests.get(url, headers=headers, proxies=self.proxies, verify=False, allow_redirects=False, timeout=5) except Exception as e: continue content_length = len(resp.content) result = self.search_title_re.search(resp.text) if result: title = result.groups()[0] else: title = "" msg = f'url:{url}\thost:{host}\tstatus_code:{resp.status_code}\tlength:{content_length}\ttitle:{title}' if resp.status_code < 400: print(msg) self.result_save(msg) self.result_list.append([url,host,resp.status_code,content_length,title])
def domainCheckRun(self): while not self.domain_queue.empty(): try: domain= self.domain_queue.get(timeout=1) except Exception as e: break if not self.checkDomainStatus(domain): # 不可解析的加入到檢測列表中 self.host_list.append(domain)
def domainCheckThreadMain(self): print(f"域名外網解析檢測,檢測前共有域名數:{len(self.host_list)}") for host in self.host_list: self.domain_queue.put(host,timeout=0.5) self.host_list = [] # 清空 td_list = [] td = threading.Thread(target=self.showProgress,args=("域名解析檢測進度",self.domain_queue,self.domain_queue.qsize()) ) td_list.append(td) for i in range(self.thread_num): td = threading.Thread(target=self.domainCheckRun, ) td_list.append(td) for td in td_list: td.start() for td in td_list: td.join() print(f"域名外網解析檢測結束,發現不可解析域名數:{len(self.host_list)}")
def showProgress(self,title,queue_object,count_init,time_show=3): ''' 進度條展示 :return: ''' while not queue_object.empty(): print(f"\r{title}:{count_init-queue_object.qsize()}/{count_init}",end="") time.sleep(time_show) print("")
def hostScanThreadMain(self): ''' 異步掃描,主函數 :return: ''' td_list = [] td = threading.Thread(target=self.showProgress, args=("ip、host檢測進度", self.data_queue, self.data_queue.qsize())) td_list.append(td) for i in range(self.thread_num): td = threading.Thread(target=self.hostScanRun,) td_list.append(td) for td in td_list: td.start() for td in td_list: td.join()
# 結果轉xlsx self.result_list = sorted(self.result_list,key=lambda x:x[3],reverse=True) wb = openpyxl.Workbook() ws = wb.active ws.title = "host碰撞結果" ws.append(["url","host","status_code","content_length","title"]) for row in self.result_list: ws.append(row) wb.save(self.result_xlsx_file)
def runMain(self): if self.isDomainCheck: # dns解析檢測 self.domainCheckThreadMain() new_host_list = [ host+"" for host in self.host_list] with open(self.new_host_save_file,"w",encoding="utf-8") as f: f.writelines(new_host_list) self.data_queue = self.create_ip_host_queue(self.ip_list, self.host_list) self.hostScanThreadMain()
if __name__ == '__main__': ip_file = "ip.txt" host_file = 'host.txt' userAgent_file = "userAgent.txt" thread_num = 10 proxies = { "http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080", } h = HostScan(ip_file,host_file,userAgent_file,thread_num,proxies=None,isDomainCheck=True) h.runMain()
VSole
網絡安全專家