巡风源码浅析之 Vulscan 分析篇

安全工具 2017-12-12

巡风是一款适用于企业内网的漏洞快速应急、巡航扫描系统,通过搜索功能可清晰的了解内部网络资产分布情况,并且可指定漏洞插件对搜索结果进行快速漏洞检测并输出结果报表。

开源地址:

https://github.com/ysrc/xunfeng

本文作者:LandGrey

0x00: VulScan 介绍

查看介绍前,请先查看《巡风源码浅析之 Nascan 分析篇》 的 0x01:分析准备,文章地址如下:

https://landgrey.me/xunfeng-nascan-analysis/

VulScan 部分的目录结构可抽象为:

1.png

其中主要的逻辑都在 VulScan.py 中,其它放置了几个外部模块 nmb、pyasn1 和 smb;

扫描插件有两种,放置在 vuldb 文件夹下,一是 python 脚本型插件,一是 json 文件型插件;

本文主要分析 VulScan.py 文件并选取一个 python 脚本型插件、一个 json 文件型插件做简要说明,和 Nascan 相比,这部分代码少很多,逻辑也不复杂,所以会啰嗦一点,可能更适合新手学习。

0x01: VulScan 分析

脚本开头执行了下面几行代码

sys.path.append(sys.path[0]+ '/vuldb')

sys.path.append(sys.path[0] + "/../")

from Config import ProductionConfig

主要是将 vuldb 目录和上级目录加入系统路径中,可以直接 from Configimport ProductionConfig 和导入 python 脚本型插件。

然后进行了数据库连接和一些变量初始化工作。

看一下程序入口:

if __name__ == '__main__':
    # 插件初始化加载
    init()
    # 将密码字典、运行线程数、超时时间、ip地址白名单从数据库中取出
    PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config()
    # 开启监控线程
    thread.start_new_thread(monitor, ())
    while True:
        # 取出计划任务信息
        task_id, task_plan, task_target, task_plugin = queue_get()
        # 如没有任务,sleep后,回到上一行代码
        if task_id == '':
            time.sleep(5)
            continue
        # 否则进入下面的流程,准备扫描漏洞

        # 清理插件的缓存
        if PLUGIN_DB:
            del sys.modules[PLUGIN_DB.keys()[0]]
            PLUGIN_DB.clear()

        for task_netloc in task_target:
            while True:
                if int(thread._count()) < THREAD_COUNT:
                    if task_netloc[0] in WHITE_LIST:
                        break
                    thread.start_new_thread(vulscan, (task_id, task_netloc, task_plugin))
                    break
                else:
                    time.sleep(2)
        if task_plan == 0:
            na_task.update({"_id": task_id}, {"$set": {"status": 2}})

init() 函数首先进行插件初始化加载,如果发现数据库中已经存储有插件数据,就不继续执行了。

if na_plugin.find().count() >= 1:

​ return

插件信息如果没有存储到数据库中,用 os.listdir() 函数列出插件目录下的文件, 按文件名后缀对两种类型插件分类。

for filename in file_list:
    try:
        if filename.split('.')[1] == 'py':
            script_plugin.append(filename.split('.')[0])
            
        if filename.split('.')[1] == 'json':
            json_plugin.append(filename)
    except:
        pass

对于 python 脚本插件,用 __import__ (动态导入),然后统一调用插件中的 get_plugin_info() 方法,将插件详细的描述信息存入数据库:

res_tmp =__import__(plugin_name)

对 json 文件型插件,用 json.loads() 函数加载文件内容

json_text =open(sys.path[0] + '/vuldb/' + plugin_name, 'r').read()

plugin_info = json.loads ( json_text )

删除关于检测部分的漏洞,然后也是只将描述信息存入数据库:

delplugin_info['plugin']

na_plugin.insert(plugin_info)

初始化插件后,将密码字典、运行线程数、超时时间、ip 地址白名单从数据库中取出

PASSWORD_DIC,THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config()

开启了一个监控线程,监控是否加载任务,并及时更新密码字典、运行线程数、超时时间、ip 地址白名单:

thread.start_new_thread(monitor,())

有下面这么一段代码,主要是设置 load 值,为下面的不同延时值提供依据,并且写入数据库 Heartbeat 集合中,表示当前有无插件被调用 ( 1 正被调用,0 没有调用)。

queue_count = na_task.find({"status": 0, "plan": 0}).count()
if queue_count:
    load = 1
else:
    ac_count = thread._count()
    load = float(ac_count - 4) / THREAD_COUNT

再回到程序入口 main 中,到达 while True 语句块中,一直检测当前是否有任务进来:

# 取出计划任务信息
task_id, task_plan, task_target, task_plugin = queue_get()

# 如没有任务,sleep后,回到上一行代码
if task_id == '':
    time.sleep(5)
    continue

当任务进来时:

if PLUGIN_DB:

    del sys.modules[PLUGIN_DB.keys()[0]]
    PLUGIN_DB.clear()

sys.modules.keys() 存储了已经加载的模块,再调用已加载模块是取得其中的缓存,没有重新导入模块

clip_image001.png

再看下面两行清除插件缓存的代码:

del sys.modules[PLUGIN_DB.keys()[0]]

PLUGIN_DB.clear()

用 del 删除以前导入的模块缓存,以便重新导入时能导入最新版的 python 脚本插件;

参数用 PLUGIN_DB.keys()[0] 是因为:在 VulScan 扫描逻辑中,是以插件来分类扫描任务的。

即一个插件被多个目标调用,而不是一个目标调用多个插件。所以,最多同时只存在一种类型的插件,也就 keys() 的值一直都是一个,没必要使用 for 循环,全部 del 一遍。

PLUGIN_DB.clear() 是清除内存中 dict() 类型的变量缓存。

做了那么多工作,其实我们还没开始扫描~ 下面代码才真正开始扫描呐,吃不吃惊?前面说了一堆废话……

for task_netloc in task_target:

    while True:

        if int(thread._count()) < THREAD_COUNT:

            # 跳过白名单ip
            if task_netloc[0] in WHITE_LIST:
                break

            thread.start_new_thread(vulscan, (task_id, task_netloc, task_plugin))
            break

        else:
            time.sleep(2)

遍历目标,一个目标开一个线程,但是只有当前运行的总线程数 ( thread._count() ) 小于设置的总线程数 ( THREAD_COUNT ) 时,才会继续开新的线程,避免目标过多,开的线程太多卡死!

然后检测了 ip 地址是否为白名单中的地址,如果不是才会继续进行下去;将任务 id、ip 地址和端口号、使用的插件传入 VulScan 进行正式扫描。

vulscan() 类如下:

clip_image002.png

主要看一下关于 json 文件型插件的漏洞检测函数 poc_check() 和主扫描函数 start(),其它看看标注的注释就好了:)

poc_check() 函数的主要流程:

clip_image003.png

start() 函数扫描主流程:

def start(self):
    self.get_plugin_info()
    # json数据标示符检测模式
    if '.json' in self.plugin_info['filename']:
        try:
            # 读取漏洞标示
            self.load_json_plugin()
            # 标示符转换为请求
            self.set_request()
            # 漏洞检测检测
            self.poc_check()
        except Exception, e:
            return
    # python脚本检测模式
    else:
        plugin_filename = self.plugin_info['filename']
        self.log(str(self.task_netloc) + "call " + self.task_plugin)
        # 插件没被加载,根据名称动态导入插件python插件模块
        if task_plugin not in PLUGIN_DB:
            plugin_res = __import__(plugin_filename)
            # 给插件声明密码字典
            setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC)
            # 加入{名称:模块对象} 映射字典
            PLUGIN_DB[plugin_filename] = plugin_res
        try:
            # 调用每个插件的check方法
            self.result_info = PLUGIN_DB[plugin_filename].check(str(self.task_netloc[0]), int(self.task_netloc[1]), TIMEOUT)
        except:
            pass
    # 保存结果
    self.save_request()

里面用 setattr() 函数给 python 脚本插件增加了 PASSWORD_DIC 这个属性,以便可以直接在插件中调用密码字典

setattr(plugin_res,"PASSWORD_DIC", PASSWORD_DIC)

后面调用了每个插件的 check() 方法,执行插件,并获得扫描信息

self.result_info= PLUGIN_DB[plugin_filename].check(str(self.task_netloc[0]),int(self.task_netloc[1]), TIMEOUT)

VulsSan.py 文件基本就那么多内容。

0x02: 插件形式简单分析

选一个 python 插件 crack_postgres.py 看下。

python 脚本型插件里面必须要有的两个函数是:用来返回插件说明信息的 get_plugin_info() 函数:

def get_plugin_info():

    plugin_info = {

        "name": "PostgresSQL弱口令",

        "info": "导致数据库敏感信息泄露,严重可导致服务器直接被入侵。",

        "level": "高危",

        "type": "弱口令",

        "author": "hos@YSRC",

        "url": "",

        "keyword": "server:postgresql",

        "source": 1

}

    return plugin_info

和执行扫描逻辑返回扫描结果的check()函数:

def check(ip, port, timeout):

    user_list = ['postgres', 'admin']

    for user in user_list:

        for pass_ in PASSWORD_DIC:

            try:

                pass_ = str(pass_.replace('{user}', user))

                result = auth(ip, port, user, pass_, timeout)

                if result == 3: break

                if result == True: return u"存在弱口令,用户名:%s 密码:%s" % (user, pass_)

            except Exception, e:

                pass

其它都是非必要函数,可以写在同一个脚本中,供这两个函数调用,最后漏洞利用成功返回相关信息。

其中值得注意的 check() 函数里这行语句,PASSWORD_DIC 这个变量在 crack_postgres.py 中并没有定义,而是上面说的 VulScan.py 中用setattr() 为每个 python 脚本插件定义的。

for pass_ inPASSWORD_DIC:

选个 json 文件型插件 Docker_Remote_API_20161220120458.json 分析下:

{

"info": "Docker Remote API未授权访问可导致代码泄露,严重可导致服务器被入侵控制。",

"source": 1,

"name": "Docker Remote API未授权访问",

"keyword": "port:2375",

"level": "高危",

"url": "http://www.tuicool.com/articles/3Yv2iiY",

"author": "wolf@YSRC",

"type": "未授权访问",

"plugin": {

    "url": "/containers/json",

    "tag": "Remote API 未授权访问",

    "analyzing": "keyword",

    "analyzingdata": "HostConfig",

    "data": "",

    "method": "GET"

    }

}

plugin 子字典类型中存储者关于漏洞扫描的信息,其它都是说明性的参看信息。

plugin 中的几个键代表的意思如下:

url: 要访问的漏洞URL地址

tag: 漏洞标签

analyzing: 分析模式(keyword表示根据关键词判断漏洞)

analyzingdata: 分析的具体数据(这里是指关键词是什么)

data: POST请求要用到的数据

method: 使用的HTTP请求方法

0x03: 总结

思想亮点在于

1、设置了 python 插件的统一格式,有两个必须的函数,用来统一调用,为每个插件都 setattr 密码字典;

2、对较简单 web 漏洞检测的设置 json 格式形插件,总体设置了三种检测漏洞的方式,比较灵活;

3、监控线程用的场合比较好,实时检测数据库中的变化 ( 用户操作的变化 );

难点在于

1、VulScan.py 里面有个 plan 值变量,结合代码并试运行之后,才能确定是表示任务设置的周期(天数);

2、还有个 status 变量值,取值范围【 0,1,2 】,衡量是否到达固定扫描周期的比值,用来触发再次的;

以上都是在下的片面的愚见,若有错误或描述不当之处,还请指正。


本文由 信安之路 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

楼主残忍的关闭了评论