243 lines
10 KiB
Python
Raw Normal View History

#!/usr/bin/python3.8.4 (python版本)
# -*- coding: utf-8 -*-
# @Author : UzJu@菜菜狗
# @Email : UzJuer@163.com
# @Software: PyCharm
# @Time : 2022/7/2 14:22
# @File : main.py
from config.logs import logger
from plugins.results import *
import urllib.parse
import prettytable as pt
import multiprocessing
from core.aliyun import oss
from core.aws import aws
from core.tencent import cos
from core.huaweiyun import obs
def Aliyun_file_scan(filename):
target_file = open(filename, mode='r', encoding='utf-8')
p = multiprocessing.Pool(processes=3)
for i in target_file.read().splitlines():
p.apply_async(Aliyun_OSS, args=(i,))
p.close()
p.join()
p.terminate()
def Aliyun_OSS(target):
"""
:desc: aliyun Bucket Scan function
:param target: Bucket URL
:return:
"""
logger.log("INFOR", f"开始扫描> {target}")
aliyun_print_table_header = pt.PrettyTable(
['Bucket', 'BucketHijack', 'GetBucketObjectList', 'PutBucketObject', 'GetBucketAcl', 'PutBucketAcl',
'GetBucketPolicy'])
aliyun_scan_results = {
"BucketName": target,
"BucketDoesBucketExist": False,
"BucketHijack": False,
"GetBucketObjectList": False,
"PutBucketObject": False,
"GetBucketAcl": False,
"PutBucketAcl": False,
"GetBucketPolicy": False,
}
get_domain = urllib.parse.urlparse(target).netloc
if get_domain == "":
get_target_list = target.split('.')
aliyunOss_Check_init = oss.Aliyun_Oss_Bucket_Check(target=get_target_list[0],
location=get_target_list[1])
aliyunOss_Exploit_init = oss.Aliyun_Oss_Bucket_Exploit(target=get_target_list[0],
location=get_target_list[1])
if aliyunOss_Check_init.Aliyun_Oss_BucketDoesBucketExist():
logger.log("INFOR", f"{target}> 当前存储桶不存在, 尝试劫持存储桶")
if aliyunOss_Exploit_init.Aliyun_Oss_CreateBucket_Exp():
logger.log("ALERT", f"{target}> 新创建/新版存储桶不可劫持")
else:
aliyunOss_Exploit_init.Aliyun_Oss_PutObject_Exp()
aliyunOss_Exploit_init.Aliyun_Oss_PutBucketPolicy_Exp()
aliyunOss_Exploit_init.Aliyun_Oss_GetBucketPolicy_Exp()
aliyunOss_Exploit_init.Aliyun_Oss_PutBucketAcl_Exp()
else:
if aliyunOss_Check_init.Aliyun_Oss_GetBucketObject_List():
logger.log("INFOR", f"{target}> 存储桶对象可遍历")
aliyun_scan_results['GetBucketObject'] = True
else:
logger.log("ALERT", f"{target}> 存储桶对象不可遍历")
if aliyunOss_Check_init.Aliyun_Oss_PutBucketObject():
logger.log("INFOR", f"{target}> 可未授权上传对象至存储桶(可导致覆盖已有对象)")
aliyun_scan_results['PutBucketObject'] = True
else:
logger.log("ALERT", f"{target}> 不可未授权上传对象至存储桶")
if aliyunOss_Check_init.Aliyun_Oss_GetBucketAcl():
logger.log("INFOR", f"{target}> 可公开访问存储桶ACL策略")
aliyun_scan_results['GetBucketAcl'] = True
else:
logger.log("ALERT", f"{target}> 不可公开访问存储桶ACL策略")
if aliyunOss_Check_init.Aliyun_Oss_PutBucketAcl():
logger.log("INFOR", f"{target}> 可上传覆盖存储桶ACL策略")
aliyun_scan_results['PutBucketAcl'] = True
else:
logger.log("ALERT", f"{target}> 不可上传覆盖存储桶ACL策略")
results_policy = aliyunOss_Check_init.Aliyun_Oss_GetBucketPolicy()
if results_policy:
logger.log("INFOR", f"{target}> 可公开获取存储桶Policy策略组")
logger.log("INFOR", f"{target}Policy> {results_policy}")
aliyun_scan_results['GetBucketPolicy'] = True
else:
logger.log("ALERT", f"{target}> 不可公开获取存储桶Policy策略")
aliyun_print_table_header.add_row([target,
aliyun_scan_results['BucketDoesBucketExist'],
aliyun_scan_results['GetBucketObject'],
aliyun_scan_results['PutBucketObject'],
aliyun_scan_results['GetBucketAcl'],
aliyun_scan_results['PutBucketAcl'],
aliyun_scan_results['GetBucketPolicy']])
aliyun_save_file(target,
aliyun_scan_results['BucketDoesBucketExist'],
aliyun_scan_results['GetBucketObject'],
aliyun_scan_results['PutBucketObject'],
aliyun_scan_results['GetBucketAcl'],
aliyun_scan_results['PutBucketAcl'],
aliyun_scan_results['GetBucketPolicy'])
print(aliyun_print_table_header)
else:
Aliyun_OSS(get_domain)
def Tencent_Cloud_Cos(target):
tencent_cloud_print_table_header = pt.PrettyTable(
['Bucket', 'ListObject', 'PutObject', 'GetBucketACL'])
tencent_cloud_results = {
"BucketName": target,
"ListObject": False,
"PutObject": False,
"GetBucketACL": False
}
get_domain = urllib.parse.urlparse(target).netloc
if get_domain == "":
if "cos" not in target:
logger.log("ALERT", f"当前{target}非COS存储桶地址")
return
logger.log("INFOR", f"开始扫描> {target}")
get_target_list = target.split(".")
tencent_check_init = cos.TenCent_Cloud_OBS_Check(target=get_target_list[0],
location=get_target_list[2])
if tencent_check_init.ListObject():
tencent_cloud_results['ListObject'] = True
if tencent_check_init.PutObject():
tencent_cloud_results['PutObject'] = True
if tencent_check_init.GetBucketACL():
tencent_cloud_results['GetBucketACL'] = True
else:
Tencent_Cloud_Cos(target)
tencent_cloud_print_table_header.add_row([target,
tencent_cloud_results['ListObject'],
tencent_cloud_results['PutObject'],
tencent_cloud_results['GetBucketACL']])
tencent_save_file(target,
tencent_cloud_results['ListObject'],
tencent_cloud_results['PutObject'],
tencent_cloud_results['GetBucketACL'])
print(tencent_cloud_print_table_header)
def Huawei_Cloud_OBS(target):
huawei_cloud_print_table_header = pt.PrettyTable(
['Bucket', 'ListObject', 'PutObject', 'GetBucketACL'])
huawei_cloud_results = {
"BucketName": target,
"ListObject": False,
"PutObject": False,
"GetBucketACL": False
}
get_domain = urllib.parse.urlparse(target).netloc
if get_domain == "":
if "obs" not in target:
logger.log("ALERT", f"当前{target}非OBS存储桶地址")
return
logger.log("INFOR", f"开始扫描> {target}")
get_target_list = target.split(".")
huaweiyun_check_init = obs.HuaWeiCloud_OBS_Check(target=get_target_list[0],
location=get_target_list[2])
if huaweiyun_check_init.ListObject():
huawei_cloud_results['ListObject'] = True
if huaweiyun_check_init.PutObject():
huawei_cloud_results['PutObject'] = True
if huaweiyun_check_init.GetBucketACL():
huawei_cloud_results['GetBucketACL'] = True
else:
Huawei_Cloud_OBS(target)
huawei_cloud_print_table_header.add_row([target,
huawei_cloud_results['ListObject'],
huawei_cloud_results['PutObject'],
huawei_cloud_results['GetBucketACL']])
huawei_save_file(target,
huawei_cloud_results['ListObject'],
huawei_cloud_results['PutObject'],
huawei_cloud_results['GetBucketACL'])
print(huawei_cloud_print_table_header)
def AmazoneS3(target):
"""
:desc: aws bucket scan
:param target: bucket url
:return:
"""
aws_print_table_header = pt.PrettyTable(
['Bucket', 'ListObject', 'PutObject', 'GetBucketACL'])
aws_results = {
"BucketName": target,
"ListObject": False,
"PutObject": False,
"GetBucketACL": False
}
get_domain = urllib.parse.urlparse(target).netloc
if get_domain == "":
logger.log("INFOR", f"开始扫描> {target}")
get_target_list = target.split(".")
aws_check_init = aws.Amazone_Cloud_S3Bucket_Check(target=get_target_list[0],
location=get_target_list[1])
if aws_check_init.Check_Bucket_ListObject():
logger.log("INFOR", f"{target}> 存储桶对象可遍历")
aws_results['ListObject'] = True
else:
logger.log("ALERT", f"{target}> 存储桶对象不可遍历")
if aws_check_init.Check_Bucket_PutObject():
logger.log("INFOR", f"{target}> 可未授权上传对象至存储桶(可覆盖存储桶已有对象)")
aws_results['PutObject'] = True
else:
logger.log("ALERT", f"{target}> 不可未授权上传对象至存储桶(可覆盖存储桶已有对象)")
if aws_check_init.Check_Bucket_GetBucketAcl():
logger.log("INFOR", f"{target}> 存储桶ACL策略可公开获取")
aws_results['GetBucketACL'] = True
else:
logger.log("ALERT", f"{target}> 存储桶ACL策略不可公开")
else:
AmazoneS3(get_domain)
aws_print_table_header.add_row([target,
aws_results['ListObject'],
aws_results['PutObject'],
aws_results['GetBucketACL']])
aws_save_file(target,
aws_results['ListObject'],
aws_results['PutObject'],
aws_results['GetBucketACL'])
print(aws_print_table_header)