Python数字取证 - 快速指南



Python数字取证 - 简介

本章将为您介绍数字取证的本质及其历史回顾。您还将了解在现实生活中可以将数字取证应用于何处以及其局限性。

什么是数字取证?

数字取证可以定义为取证科学的一个分支,它分析、检查、识别和恢复驻留在电子设备上的数字证据。它通常用于刑事法律和私人调查。

例如,如果有人在电子设备上窃取了一些数据,您可以依靠数字取证提取证据。

数字取证的简要历史回顾

本节解释了计算机犯罪的历史和数字取证的历史回顾,如下所示:

1970年代-1980年代:第一次计算机犯罪

在此十年之前,没有承认过计算机犯罪。但是,如果它被认为会发生,那么当时的现行法律会处理它们。后来,1978年,佛罗里达州计算机犯罪法案承认了第一次计算机犯罪,其中包括针对未经授权修改或删除计算机系统上数据的立法。但随着时间的推移,由于技术的进步,所犯的计算机犯罪范围也随之扩大。为了应对与版权、隐私和儿童色情相关的犯罪,通过了其他各种法律。

1980年代-1990年代:发展十年

这个十年是数字取证的开发十年,这完全是因为第一次调查(1986年),其中克利夫·斯托尔追踪了名为马库斯·赫斯的黑客。在此期间,发展了两种数字取证学科——第一种是在从业人员将其作为爱好而开发的临时工具和技术的帮助下,而第二种则由科学界开发。1992年,术语“计算机取证”被用于学术文献中。

2000年代-2010年代:标准化十年

在数字取证发展到一定程度后,需要制定一些在进行调查时可以遵循的具体标准。因此,各个科学机构和组织发布了数字取证指南。2002年,数字证据科学工作组 (SWGDE) 发布了一篇名为“计算机取证最佳实践”的论文。另一项值得称道的成果是欧洲牵头的国际条约,即“网络犯罪公约”,由 43 个国家签署,16 个国家批准。即使有这样的标准,研究人员也仍然需要解决一些问题。

数字取证流程

自1978年第一次计算机犯罪以来,数字犯罪活动大幅增加。由于这种增加,需要以结构化的方式来处理它们。1984年,引入了正式流程,此后开发了大量新的改进的计算机取证调查流程。

计算机取证调查流程涉及三个主要阶段,如下所述:

阶段1:获取或成像证据

数字取证的第一阶段涉及保存数字系统的状态,以便以后对其进行分析。这与从犯罪现场拍摄照片、采集血液样本等非常相似。例如,它涉及捕获硬盘或 RAM 的已分配和未分配区域的映像。

阶段2:分析

此阶段的输入是在获取阶段获取的数据。在这里,检查这些数据以识别证据。此阶段提供三种证据,如下所示:

  • 罪证 - 这些证据支持给定的历史记录。

  • 无罪证据 - 这些证据与给定的历史记录相矛盾。

  • 篡改证据 - 这些证据表明系统已被篡改以避免识别。它包括检查文件和目录内容以恢复已删除的文件。

阶段3:展示或报告

顾名思义,此阶段展示了调查的结论和相应的证据。

数字取证的应用

数字取证处理收集、分析和保存任何数字设备中包含的证据。数字取证的使用取决于应用程序。如前所述,它主要用于以下两个应用程序:

刑事法律

在刑事法律中,收集证据以支持或反对法庭上的假设。取证程序与刑事调查中使用的程序非常相似,但具有不同的法律要求和限制。

私人调查

主要公司世界使用数字取证进行私人调查。当公司怀疑员工可能在其计算机上执行违反公司政策的非法活动时,就会使用它。当调查某人是否存在数字不端行为时,数字取证为公司或个人提供了最佳途径之一。

数字取证的分支

数字犯罪不仅限于计算机,而且黑客和罪犯也在大规模使用平板电脑、智能手机等小型数字设备。一些设备具有易失性存储器,而另一些则具有非易失性存储器。因此,根据设备类型,数字取证具有以下分支:

计算机取证

数字取证的这个分支处理计算机、嵌入式系统和静态存储器(如 USB 驱动器)。在计算机取证中可以调查从日志到驱动器上实际文件的各种信息。

移动取证

这处理从移动设备调查数据。这个分支不同于计算机取证,因为移动设备具有内置通信系统,这对于提供与位置相关的有用信息很有用。

网络取证

这处理监控和分析计算机网络流量(本地和广域网 (WAN))以获取信息、收集证据或入侵检测。

数据库取证

数字取证的这个分支处理数据库及其元数据的取证研究。

数字取证调查所需的技能

数字取证检查员帮助追踪黑客、恢复被盗数据、将计算机攻击追溯到其源头,并协助涉及计算机的其他类型的调查。下面讨论了一些成为数字取证检查员所需的关键技能:

杰出的思维能力

数字取证调查员必须具备杰出的思维能力,并且能够在特定任务上应用不同的工具和方法来获得输出。他/她必须能够找到不同的模式并在它们之间建立关联。

技术技能

数字取证检查员必须具备良好的技术技能,因为该领域需要网络知识、数字系统如何交互。

对网络安全充满热情

因为数字取证领域完全是关于解决网络犯罪,这是一项繁琐的任务,需要很多人才能成为一名优秀的数字取证调查员。

沟通技巧

良好的沟通技巧对于与各个团队协调以及提取任何缺失的数据或信息至关重要。

熟练制作报告

在成功实施获取和分析后,数字取证检查员必须在最终报告和演示文稿中提及所有发现。因此,他/她必须具备良好的报告撰写技巧和对细节的关注。

局限性

数字取证调查存在某些局限性,此处讨论:

需要提供令人信服的证据

数字取证调查的主要障碍之一是,检查员必须遵守法庭证据所需的标准,因为数据很容易被篡改。另一方面,计算机取证调查员必须完全了解法律要求、证据处理和文件程序,以便在法庭上提供令人信服的证据。

调查工具

数字调查的有效性完全取决于数字取证检查员的专业知识和正确调查工具的选择。如果使用的工具不符合指定标准,那么在法庭上,法官可能会否认这些证据。

受众缺乏技术知识

另一个限制是,有些人并不完全熟悉计算机取证;因此,许多人都不了解这个领域。调查人员必须确保以一种帮助每个人理解结果的方式向法院传达他们的调查结果。

成本

生成数字证据并对其进行保存非常昂贵。因此,这个过程可能不会被许多负担不起成本的人选择。

Python数字取证 - 入门

在上一章中,我们学习了数字取证的基础知识、其优点和局限性。本章将使您熟悉 Python,这是我们在本次数字取证调查中使用的基本工具。

为什么选择 Python 进行数字取证?

Python 是一种流行的编程语言,被用作网络安全、渗透测试以及数字取证调查的工具。当您选择 Python 作为您的数字取证工具时,您无需任何其他第三方软件即可完成任务。

下面列出了一些 Python 编程语言的独特功能,使其非常适合数字取证项目:

  • 语法简单 - 与其他语言相比,Python 的语法很简单,这使得人们更容易学习并将其用于数字取证。

  • 内置模块全面 - Python 的内置模块非常全面,有助于进行完整的数字取证调查。

  • 帮助和支持 - 作为一种开源编程语言,Python 享有开发人员和用户社区的出色支持。

Python 的特性

Python 作为一种高级的、解释型的、交互式的和面向对象的脚本语言,提供了以下特性:

  • 易于学习 - Python 是一种开发人员友好且易于学习的语言,因为它只有很少的关键字和最简单的结构。

  • 表达力强且易于阅读 - Python 语言本质上表达力强,因此其代码更易于理解和阅读。

  • 跨平台兼容 - Python 是一种跨平台兼容的语言,这意味着它可以在各种平台上高效运行,例如 UNIX、Windows 和 Macintosh。

  • 交互式编程 - 我们可以对代码进行交互式测试和调试,因为 Python 支持交互式编程模式。

  • 提供各种模块和函数 - Python 拥有庞大的标准库,允许我们为脚本使用丰富的模块和函数集。

  • 支持动态类型检查 - Python 支持动态类型检查并提供非常高级的动态数据类型。

  • GUI 编程 - Python 支持 GUI 编程来开发图形用户界面。

  • 与其他编程语言集成 - Python 可以轻松地与其他编程语言集成,如 C、C++、JAVA 等。

安装 Python

Python 发行版适用于各种平台,例如 Windows、UNIX、Linux 和 Mac。我们只需要根据我们的平台下载二进制代码即可。如果任何平台的二进制代码不可用,我们必须拥有 C 编译器,以便手动编译源代码。

本节将使您熟悉在各种平台上安装 Python 的方法:

在 Unix 和 Linux 上安装 Python

您可以按照以下步骤在 Unix/Linux 机器上安装 Python。

步骤 1 - 打开 Web 浏览器。键入并输入 www.python.org/downloads/

步骤 2 - 下载适用于 Unix/Linux 的压缩源代码。

步骤 3 - 解压缩下载的压缩文件。

步骤 4 - 如果您希望自定义某些选项,可以编辑Modules/Setup 文件

步骤 5 - 使用以下命令完成安装:

run ./configure script
make
make install

成功完成上述步骤后,Python 将安装在其标准位置/usr/local/bin,其库安装在/usr/local/lib/pythonXX,其中 XX 是 Python 的版本。

在 Windows 上安装 Python

我们可以按照以下简单步骤在 Windows 机器上安装 Python。

步骤 1 - 打开 Web 浏览器。键入并输入 www.python.org/downloads/

步骤 2 - 下载 Windows 安装程序python-XYZ.msi文件,其中 XYZ 是我们需要安装的版本。

步骤 3 - 将安装程序文件保存到本地计算机后,现在运行该 MSI 文件。

步骤 4 - 运行下载的文件,这将启动 Python 安装向导。

在 Macintosh 上安装 Python

要在 Mac OS X 上安装 Python 3,我们必须使用名为Homebrew的软件包安装程序。

如果您的系统上没有 Homebrew,可以使用以下命令安装它:

$ ruby -e "$(curl -fsSL
https://raw.githubusercontent.com/Homebrew/install/master/install)"

如果需要更新包管理器,可以使用以下命令:

$ brew update

现在,使用以下命令在您的系统上安装 Python3:

$ brew install python3

设置 PATH

我们需要为 Python 安装设置路径,这在 UNIX、WINDOWS 或 MAC 等平台上有所不同。

在 Unix/Linux 上设置路径

您可以使用以下选项在 Unix/Linux 上设置路径:

  • 如果使用 csh shell - 键入setenv PATH "$PATH:/usr/local/bin/python",然后按 Enter。

  • 如果使用 bash shell (Linux) - 键入export PATH="$PATH:/usr/local/bin/python",然后按 Enter。

  • 如果使用 sh 或 ksh shell - 键入PATH="$PATH:/usr/local/bin/python",然后按 Enter。

在 Windows 上设置路径

在命令提示符下键入path %path%;C:\Python,然后按 Enter。

运行 Python

您可以选择以下三种方法中的任何一种来启动 Python 解释器:

方法 1:使用交互式解释器

提供命令行解释器或 shell 的系统可以轻松地用于启动 Python。例如,Unix、DOS 等。您可以按照以下步骤在交互式解释器中开始编码:

步骤 1 - 在命令行中输入python

步骤 2 - 使用以下所示的命令立即在交互式解释器中开始编码:

$python # Unix/Linux
or
python% # Unix/Linux
or
C:> python # Windows/DOS

方法 2:从命令行使用脚本

我们还可以通过在应用程序上调用解释器来在命令行中执行 Python 脚本。您可以使用以下所示的命令:

$python script.py # Unix/Linux
or
python% script.py # Unix/Linux
or
C: >python script.py # Windows/DOS

方法 3:集成开发环境

如果系统具有支持 Python 的 GUI 应用程序,则可以从该 GUI 环境中运行 Python。以下是一些针对各种平台的 IDE:

  • Unix IDE - UNIX 为 Python 提供了 IDLE IDE。

  • Windows IDE - Windows 提供了 PythonWin,它是 Python 的第一个 Windows 界面,并带有 GUI。

  • Macintosh IDE - Macintosh 提供了 IDLE IDE,可从主网站下载,可以下载为 MacBinary 或 BinHex'd 文件。

工件报告

现在您已经熟悉了在本地系统上安装和运行 Python 命令,让我们详细了解取证概念。本章将解释在 Python 数字取证中处理工件涉及的各种概念。

创建报告的必要性

数字取证过程包括报告作为第三阶段。这是数字取证过程中最重要的部分之一。创建报告是必要的,原因如下:

  • 它是数字取证检查员概述调查过程及其发现的文档。

  • 其他检查员可以参考一份好的数字取证报告,通过给定的相同存储库获得相同的结果。

  • 它是一份技术和科学文档,包含在数字证据的 1 和 0 中发现的事实。

报告创建的一般指南

报告的编写是为了向读者提供信息,并且必须以坚实的基础开始。如果报告在没有一些一般指南或标准的情况下准备,调查人员可能会难以有效地呈现他们的发现。以下是一些创建数字取证报告时必须遵循的一般指南:

  • 摘要 - 报告必须包含信息的简要摘要,以便读者能够确定报告的目的。

  • 使用的工具 - 我们必须提及用于执行数字取证过程的工具,包括其用途。

  • 存储库 - 假设我们调查了某人的计算机,然后是证据的摘要和相关材料(如电子邮件、内部搜索历史记录等)的分析,则必须将其包含在报告中,以便清楚地呈现案例。

  • 对律师的建议 - 报告必须包含对律师的建议,以便根据报告中的发现继续或停止调查。

创建不同类型的报告

在上一节中,我们了解了报告在数字取证中的重要性以及创建报告的指南。下面讨论了 Python 中用于创建不同类型报告的一些格式:

CSV 报告

报告最常见的输出格式之一是 CSV 电子表格报告。您可以创建 CSV 以使用以下所示的 Python 代码创建已处理数据的报告:

首先,导入用于写入电子表格的有用库:

from __future__ import print_function
import csv
import os
import sys

现在,调用以下方法:

Write_csv(TEST_DATA_LIST, ["Name", "Age", "City", "Job description"], os.getcwd())

我们使用以下全局变量来表示示例数据类型:

TEST_DATA_LIST = [["Ram", 32, Bhopal, Manager], 
   ["Raman", 42, Indore, Engg.],
   ["Mohan", 25, Chandigarh, HR], 
   ["Parkash", 45, Delhi, IT]]

接下来,让我们定义方法以继续进行进一步的操作。我们以“w”模式打开文件并将 newline 关键字参数设置为空字符串。

def Write_csv(data, header, output_directory, name = None):
   if name is None:
      name = "report1.csv"
   print("[+] Writing {} to {}".format(name, output_directory))
   
   with open(os.path.join(output_directory, name), "w", newline = "") as \ csvfile:
      writer = csv.writer(csvfile)
      writer.writerow(header)
      writer.writerow(data)

如果运行上述脚本,您将获得存储在 report1.csv 文件中的以下详细信息。

姓名 年龄 城市 职位
Ram 32 32 Bhopal
经理 42 Raman 28
Indore 25 工程师 Mohan
35 45 Chandigarh 人力资源

Parkash

另一种常见的报告输出格式是 Excel (.xlsx) 电子表格报告。我们可以创建表格并使用 Excel 绘制图形。我们可以使用以下所示的 Python 代码以 Excel 格式创建已处理数据的报告:

首先,导入 XlsxWriter 模块以创建电子表格:

import xlsxwriter

现在,创建一个工作簿对象。为此,我们需要使用 Workbook() 构造函数。

workbook = xlsxwriter.Workbook('report2.xlsx')

现在,使用 add_worksheet() 模块创建一个新的工作表。

worksheet = workbook.add_worksheet()

接下来,将以下数据写入工作表:

report2 = (['Ram', 32, ‘Bhopal’],['Mohan',25, ‘Chandigarh’] ,['Parkash',45, ‘Delhi’])

row = 0
col = 0

您可以遍历此数据并按如下方式写入:

for item, cost in (a):
   worksheet.write(row, col, item)
   worksheet.write(row, col+1, cost)
   row + = 1

现在,让我们使用 close() 方法关闭此 Excel 文件。

workbook.close()

上述脚本将创建一个名为 report2.xlsx 的 Excel 文件,其中包含以下数据:

Ram 32 32
Indore 25 工程师
35 45 Chandigarh

调查采集介质

对于调查人员来说,拥有详细的调查记录以准确回忆发现或将所有调查片段拼凑起来非常重要。屏幕截图对于跟踪特定调查采取的步骤非常有用。借助以下 Python 代码,我们可以截取屏幕截图并将其保存到硬盘以供将来使用。

首先,使用以下命令安装名为 pyscreenshot 的 Python 模块:

Pip install pyscreenshot

现在,导入必要的模块,如下所示:

import pyscreenshot as ImageGrab

使用以下代码行获取屏幕截图:

image = ImageGrab.grab()

使用以下代码行将屏幕截图保存到给定位置:

image.save('d:/image123.png')

现在,如果要将屏幕截图作为图形弹出,可以使用以下 Python 代码:

import numpy as np
import matplotlib.pyplot as plt
import pyscreenshot as ImageGrab
imageg = ImageGrab.grab()
plt.imshow(image, cmap='gray', interpolation='bilinear')
plt.show()

Python 数字移动设备取证

本章将解释移动设备上的 Python 数字取证以及所涉及的概念。

简介

移动设备取证是数字取证的一个分支,它处理移动设备的采集和分析,以恢复与调查相关的数字证据。该分支不同于计算机取证,因为移动设备具有内置的通信系统,可用于提供与位置相关的有用信息。

尽管智能手机在数字取证中的使用日益增多,但由于其异构性,它仍然被认为是非标准的。另一方面,计算机硬件(如硬盘)被认为是标准的,并且也发展成为一个稳定的学科。在数字取证行业,关于用于非标准设备(具有瞬态证据,如智能手机)的技术存在很多争论。

可从移动设备中提取的工件

与仅具有通话记录或短信的旧手机相比,现代移动设备拥有大量数字信息。因此,移动设备可以为调查人员提供对其用户的大量见解。可以从移动设备中提取的一些工件如下所示:

  • 消息 − 这些是有用的工件,可以揭示所有者的思维状态,甚至可以为调查人员提供一些以前未知的信息。

  • 位置历史记录− 位置历史记录数据是一个有用的工件,调查人员可以使用它来验证某人的特定位置。

  • 已安装应用程序 − 通过访问已安装的应用程序类型,调查人员可以深入了解手机用户的习惯和思维方式。

Python 中的证据来源和处理

智能手机使用 SQLite 数据库和 PLIST 文件作为主要证据来源。在本节中,我们将使用 Python 处理证据来源。

分析 PLIST 文件

PLIST(属性列表)是一种灵活且方便的格式,用于存储应用程序数据,尤其是在 iPhone 设备上。它使用扩展名.plist。此类文件用于存储有关捆绑包和应用程序的信息。它可以采用两种格式:XML二进制。以下 Python 代码将打开并读取 PLIST 文件。请注意,在继续执行此操作之前,我们必须创建自己的Info.plist 文件。

首先,使用以下命令安装名为biplist 的第三方库:

Pip install biplist

现在,导入一些有用的库来处理 plist 文件:

import biplist
import os
import sys

现在,在 main 方法下使用以下命令可以将 plist 文件读取到变量中:

def main(plist):
   try:
      data = biplist.readPlist(plist)
   except (biplist.InvalidPlistException,biplist.NotBinaryPlistException) as e:
print("[-] Invalid PLIST file - unable to be opened by biplist")
sys.exit(1)

现在,我们可以从这个变量中读取控制台上的数据或直接打印它。

SQLite 数据库

SQLite 充当移动设备上的主要数据存储库。SQLite 是一个进程内库,它实现了自包含、无服务器、零配置的事务性 SQL 数据库引擎。它是一个零配置的数据库,您无需在系统中配置它,这与其他数据库不同。

如果您是新手或不熟悉 SQLite 数据库,您可以访问链接www.tutorialspoint.com/sqlite/index.htm。此外,如果您想深入了解 SQLite 和 Python,可以访问链接www.tutorialspoint.com/sqlite/sqlite_python.htm

在移动取证过程中,我们可以与移动设备的sms.db文件交互,并可以从message表中提取有价值的信息。Python 有一个名为sqlite3的内置库,用于连接 SQLite 数据库。您可以使用以下命令导入它:

import sqlite3

现在,借助以下命令,我们可以连接到数据库,例如移动设备中的sms.db

Conn = sqlite3.connect(‘sms.db’)
C = conn.cursor()

这里,C 是游标对象,我们可以借助它与数据库交互。

现在,假设如果我们想执行特定的命令,例如获取abc 表中的详细信息,可以使用以下命令:

c.execute(“Select * from abc”)
c.close()

上述命令的结果将存储在cursor对象中。类似地,我们可以使用fetchall()方法将结果转储到我们可以操作的变量中。

我们可以使用以下命令获取sms.db中 message 表的列名数据:

c.execute(“pragma table_info(message)”)
table_data = c.fetchall()
columns = [x[1] for x in table_data

请注意,这里我们使用的是 SQLite PRAGMA 命令,这是一个特殊的命令,用于控制 SQLite 环境中的各种环境变量和状态标志。在上述命令中,fetchall()方法返回一个结果元组。每个列的名称都存储在每个元组的第一个索引中。

现在,借助以下命令,我们可以查询该表的所有数据并将其存储在名为data_msg的变量中:

c.execute(“Select * from message”)
data_msg = c.fetchall()

上述命令会将数据存储在变量中,此外,我们还可以使用csv.writer()方法将上述数据写入 CSV 文件。

iTunes 备份

iPhone 移动取证可以在 iTunes 创建的备份上执行。取证检查员依靠分析通过 iTunes 获取的 iPhone 逻辑备份。iTunes 使用 AFC(Apple 文件连接)协议进行备份。此外,备份过程不会修改 iPhone 上的任何内容,除了托管密钥记录。

现在,问题出现了,为什么数字取证专家需要了解 iTunes 备份的技术?如果我们获得了嫌疑人的电脑而不是直接获得 iPhone,这一点很重要,因为当电脑用于与 iPhone 同步时,iPhone 上的大多数信息都可能备份到电脑上。

备份过程及其位置

每当将 Apple 产品备份到电脑时,它都会与 iTunes 同步,并且将有一个包含设备唯一 ID 的特定文件夹。在最新的备份格式中,文件存储在包含文件名前两个十六进制字符的子文件夹中。在这些备份文件中,有一些文件如 info.plist 很有用,以及名为 Manifest.db 的数据库。下表显示了备份位置,这些位置因 iTunes 备份的操作系统而异:

操作系统 备份位置
Win7 C:\Users\[用户名]\AppData\Roaming\AppleComputer\MobileSync\Backup\
MAC OS X ~/Library/Application Suport/MobileSync/Backup/

要使用 Python 处理 iTunes 备份,我们首先需要根据我们的操作系统识别备份位置中的所有备份。然后,我们将遍历每个备份并读取数据库 Manifest.db。

现在,借助以下 Python 代码,我们可以执行相同的操作:

首先,导入必要的库,如下所示:

from __future__ import print_function
import argparse
import logging
import os

from shutil import copyfile
import sqlite3
import sys
logger = logging.getLogger(__name__)

现在,提供两个位置参数,即 INPUT_DIR 和 OUTPUT_DIR,分别表示 iTunes 备份和所需的输出文件夹:

if __name__ == "__main__":
   parser.add_argument("INPUT_DIR",help = "Location of folder containing iOS backups, ""e.g. ~\Library\Application Support\MobileSync\Backup folder")
   parser.add_argument("OUTPUT_DIR", help = "Output Directory")
   parser.add_argument("-l", help = "Log file path",default = __file__[:-2] + "log")
   parser.add_argument("-v", help = "Increase verbosity",action = "store_true") args = parser.parse_args()

现在,设置日志,如下所示:

if args.v:
   logger.setLevel(logging.DEBUG)
else:
   logger.setLevel(logging.INFO)

现在,设置此日志的消息格式,如下所示:

msg_fmt = logging.Formatter("%(asctime)-15s %(funcName)-13s""%(levelname)-8s %(message)s")
strhndl = logging.StreamHandler(sys.stderr)
strhndl.setFormatter(fmt = msg_fmt)

fhndl = logging.FileHandler(args.l, mode = 'a')
fhndl.setFormatter(fmt = msg_fmt)

logger.addHandler(strhndl)
logger.addHandler(fhndl)
logger.info("Starting iBackup Visualizer")
logger.debug("Supplied arguments: {}".format(" ".join(sys.argv[1:])))
logger.debug("System: " + sys.platform)
logger.debug("Python Version: " + sys.version)

以下代码行将使用os.makedirs()函数为所需的输出目录创建必要的文件夹:

if not os.path.exists(args.OUTPUT_DIR):
   os.makedirs(args.OUTPUT_DIR)

现在,将提供的输入和输出目录传递给 main() 函数,如下所示:

if os.path.exists(args.INPUT_DIR) and os.path.isdir(args.INPUT_DIR):
   main(args.INPUT_DIR, args.OUTPUT_DIR)
else:
   logger.error("Supplied input directory does not exist or is not ""a directory")
   sys.exit(1)

现在,编写main()函数,该函数将进一步调用backup_summary()函数以识别输入文件夹中存在的所有备份:

def main(in_dir, out_dir):
   backups = backup_summary(in_dir)
def backup_summary(in_dir):
   logger.info("Identifying all iOS backups in {}".format(in_dir))
   root = os.listdir(in_dir)
   backups = {}
   
   for x in root:
      temp_dir = os.path.join(in_dir, x)
      if os.path.isdir(temp_dir) and len(x) == 40:
         num_files = 0
         size = 0
         
         for root, subdir, files in os.walk(temp_dir):
            num_files += len(files)
            size += sum(os.path.getsize(os.path.join(root, name))
               for name in files)
         backups[x] = [temp_dir, num_files, size]
   return backups

现在,将每个备份的摘要打印到控制台,如下所示:

print("Backup Summary")
print("=" * 20)

if len(backups) > 0:
   for i, b in enumerate(backups):
      print("Backup No.: {} \n""Backup Dev. Name: {} \n""# Files: {} \n""Backup Size (Bytes): {}\n".format(i, b, backups[b][1], backups[b][2]))

现在,将 Manifest.db 文件的内容转储到名为 db_items 的变量中。

try:
   db_items = process_manifest(backups[b][0])
   except IOError:
      logger.warn("Non-iOS 10 backup encountered or " "invalid backup. Continuing to next backup.")
continue

现在,让我们定义一个函数,该函数将获取备份的目录路径:

def process_manifest(backup):
   manifest = os.path.join(backup, "Manifest.db")
   
   if not os.path.exists(manifest):
      logger.error("Manifest DB not found in {}".format(manifest))
      raise IOError

现在,使用 SQLite3,我们将通过名为 c 的游标连接到数据库:

c = conn.cursor()
items = {}

for row in c.execute("SELECT * from Files;"):
   items[row[0]] = [row[2], row[1], row[3]]
return items

create_files(in_dir, out_dir, b, db_items)
   print("=" * 20)
else:
   logger.warning("No valid backups found. The input directory should be
      " "the parent-directory immediately above the SHA-1 hash " "iOS device backups")
      sys.exit(2)

现在,定义create_files()方法,如下所示:

def create_files(in_dir, out_dir, b, db_items):
   msg = "Copying Files for backup {} to {}".format(b, os.path.join(out_dir, b))
   logger.info(msg)

现在,遍历db_items字典中的每个键:

for x, key in enumerate(db_items):
   if db_items[key][0] is None or db_items[key][0] == "":
      continue
   else:
      dirpath = os.path.join(out_dir, b,
os.path.dirname(db_items[key][0]))
   filepath = os.path.join(out_dir, b, db_items[key][0])
   
   if not os.path.exists(dirpath):
      os.makedirs(dirpath)
      original_dir = b + "/" + key[0:2] + "/" + key
   path = os.path.join(in_dir, original_dir)
   
   if os.path.exists(filepath):
      filepath = filepath + "_{}".format(x)

现在,使用shutil.copyfile()方法复制备份的文件,如下所示:

try:
   copyfile(path, filepath)
   except IOError:
      logger.debug("File not found in backup: {}".format(path))
         files_not_found += 1
   if files_not_found > 0:
      logger.warning("{} files listed in the Manifest.db not" "found in
backup".format(files_not_found))
   copyfile(os.path.join(in_dir, b, "Info.plist"), os.path.join(out_dir, b,
"Info.plist"))
   copyfile(os.path.join(in_dir, b, "Manifest.db"), os.path.join(out_dir, b,
"Manifest.db"))
   copyfile(os.path.join(in_dir, b, "Manifest.plist"), os.path.join(out_dir, b,
"Manifest.plist"))
   copyfile(os.path.join(in_dir, b, "Status.plist"),os.path.join(out_dir, b,
"Status.plist"))

使用上述 Python 脚本,我们可以在输出文件夹中获取更新的备份文件结构。我们可以使用pycrypto python 库来解密备份。

Wi-Fi

移动设备可以通过连接到随处可见的 Wi-Fi 网络来连接到外部世界。有时设备会自动连接到这些开放网络。

对于 iPhone,设备已连接的开放 Wi-Fi 连接列表存储在名为com.apple.wifi.plist的 PLIST 文件中。此文件将包含 Wi-Fi SSID、BSSID 和连接时间。

我们需要使用 Python 从标准 Cellebrite XML 报告中提取 Wi-Fi 详细信息。为此,我们需要使用无线地理位置记录引擎 (WIGLE) 的 API,这是一个流行的平台,可用于使用 Wi-Fi 网络名称查找设备的位置。

我们可以使用名为requests的 Python 库来访问 WIGLE 的 API。它可以按如下方式安装:

pip install requests

WIGLE 的 API

我们需要在 WIGLE 的网站https://wigle.net/account上注册以获取 WIGLE 的免费 API。下面讨论了获取有关用户设备及其通过 WIGEL 的 API 连接的信息的 Python 脚本:

首先,导入以下库以处理不同的内容:

from __future__ import print_function

import argparse
import csv
import os
import sys
import xml.etree.ElementTree as ET
import requests

现在,提供两个位置参数,即INPUT_FILEOUTPUT_CSV,它们分别表示包含 Wi-Fi MAC 地址的输入文件和所需的输出 CSV 文件:

if __name__ == "__main__":
   parser.add_argument("INPUT_FILE", help = "INPUT FILE with MAC Addresses")
   parser.add_argument("OUTPUT_CSV", help = "Output CSV File")
   parser.add_argument("-t", help = "Input type: Cellebrite XML report or TXT
file",choices = ('xml', 'txt'), default = "xml")
   parser.add_argument('--api', help = "Path to API key
   file",default = os.path.expanduser("~/.wigle_api"),
   type = argparse.FileType('r'))
   args = parser.parse_args()

现在,以下代码行将检查输入文件是否存在并且是否为文件。如果不是,则退出脚本:

if not os.path.exists(args.INPUT_FILE) or \ not os.path.isfile(args.INPUT_FILE):
   print("[-] {} does not exist or is not a
file".format(args.INPUT_FILE))
   sys.exit(1)
directory = os.path.dirname(args.OUTPUT_CSV)
if directory != '' and not os.path.exists(directory):
   os.makedirs(directory)
api_key = args.api.readline().strip().split(":")

现在,将参数传递给 main,如下所示:

main(args.INPUT_FILE, args.OUTPUT_CSV, args.t, api_key)
def main(in_file, out_csv, type, api_key):
   if type == 'xml':
      wifi = parse_xml(in_file)
   else:
      wifi = parse_txt(in_file)
query_wigle(wifi, out_csv, api_key)

现在,我们将解析 XML 文件,如下所示:

def parse_xml(xml_file):
   wifi = {}
   xmlns = "{http://pa.cellebrite.com/report/2.0}"
   print("[+] Opening {} report".format(xml_file))
   
   xml_tree = ET.parse(xml_file)
   print("[+] Parsing report for all connected WiFi addresses")
   
   root = xml_tree.getroot()

现在,遍历根的子元素,如下所示:

for child in root.iter():
   if child.tag == xmlns + "model":
      if child.get("type") == "Location":
         for field in child.findall(xmlns + "field"):
            if field.get("name") == "TimeStamp":
               ts_value = field.find(xmlns + "value")
               try:
               ts = ts_value.text
               except AttributeError:
continue

现在,我们将检查值文本中是否存在“ssid”字符串:

if "SSID" in value.text:
   bssid, ssid = value.text.split("\t")
   bssid = bssid[7:]
   ssid = ssid[6:]

现在,我们需要将 BSSID、SSID 和时间戳添加到 wifi 字典中,如下所示:

if bssid in wifi.keys():

wifi[bssid]["Timestamps"].append(ts)
   wifi[bssid]["SSID"].append(ssid)
else:
   wifi[bssid] = {"Timestamps": [ts], "SSID":
[ssid],"Wigle": {}}
return wifi

文本解析器比 XML 解析器简单得多,如下所示:

def parse_txt(txt_file):
   wifi = {}
   print("[+] Extracting MAC addresses from {}".format(txt_file))
   
   with open(txt_file) as mac_file:
      for line in mac_file:
         wifi[line.strip()] = {"Timestamps": ["N/A"], "SSID":
["N/A"],"Wigle": {}}
return wifi

现在,让我们使用 requests 模块进行WIGLE API调用,并需要继续执行query_wigle()方法:

def query_wigle(wifi_dictionary, out_csv, api_key):
   print("[+] Querying Wigle.net through Python API for {} "
"APs".format(len(wifi_dictionary)))
   for mac in wifi_dictionary:

   wigle_results = query_mac_addr(mac, api_key)
def query_mac_addr(mac_addr, api_key):

   query_url = "https://api.wigle.net/api/v2/network/search?" \
"onlymine = false&freenet = false&paynet = false" \ "&netid = {}".format(mac_addr)
   req = requests.get(query_url, auth = (api_key[0], api_key[1]))
   return req.json()

实际上,WIGLE API 调用每天都有限制,如果超过该限制,则必须显示以下错误:

try:
   if wigle_results["resultCount"] == 0:
      wifi_dictionary[mac]["Wigle"]["results"] = []
         continue
   else:
      wifi_dictionary[mac]["Wigle"] = wigle_results
except KeyError:
   if wigle_results["error"] == "too many queries today":
      print("[-] Wigle daily query limit exceeded")
      wifi_dictionary[mac]["Wigle"]["results"] = []
      continue
   else:
      print("[-] Other error encountered for " "address {}: {}".format(mac,
wigle_results['error']))
   wifi_dictionary[mac]["Wigle"]["results"] = []
   continue
prep_output(out_csv, wifi_dictionary)

现在,我们将使用prep_output()方法将字典展平为易于写入的块:

def prep_output(output, data):
   csv_data = {}
   google_map = https://www.google.com/maps/search/

现在,访问我们到目前为止收集的所有数据,如下所示:

for x, mac in enumerate(data):
   for y, ts in enumerate(data[mac]["Timestamps"]):
      for z, result in enumerate(data[mac]["Wigle"]["results"]):
         shortres = data[mac]["Wigle"]["results"][z]
         g_map_url = "{}{},{}".format(google_map, shortres["trilat"],shortres["trilong"])

现在,我们可以像本章前面脚本中所做的那样,使用write_csv()函数将输出写入 CSV 文件。

调查嵌入式元数据

在本章中,我们将详细了解如何使用 Python 数字取证调查嵌入式元数据。

简介

嵌入式元数据是指存储在同一文件中的有关数据的信息,该文件包含由该数据描述的对象。换句话说,它是存储在数字文件本身中的有关数字资产的信息。它始终与文件相关联,并且永远无法分离。

在数字取证的情况下,我们无法提取有关特定文件的所有信息。另一方面,嵌入式元数据可以为我们提供对调查至关重要的信息。例如,文本文件的元数据可能包含有关作者、长度、编写日期甚至该文档简要摘要的信息。数字图像可能包含诸如图像长度、快门速度等元数据。

包含元数据属性的工件及其提取

在本节中,我们将了解包含元数据属性的各种工件及其使用 Python 的提取过程。

音频和视频

这是两个非常常见的具有嵌入式元数据的工件。可以提取此元数据以用于调查目的。

您可以使用以下 Python 脚本从音频或 MP3 文件以及视频或 MP4 文件中提取常见属性或元数据。

请注意,对于此脚本,我们需要安装一个名为 mutagen 的第三方 python 库,它允许我们从音频和视频文件中提取元数据。它可以使用以下命令安装:

pip install mutagen

我们在此 Python 脚本中需要导入的一些有用库如下所示:

from __future__ import print_function

import argparse
import json
import mutagen

命令行处理程序将接受一个参数,该参数表示 MP3 或 MP4 文件的路径。然后,我们将使用mutagen.file()方法打开文件句柄,如下所示:

if __name__ == '__main__':
   parser = argparse.ArgumentParser('Python Metadata Extractor')
   parser.add_argument("AV_FILE", help="File to extract metadata from")
   args = parser.parse_args()
   av_file = mutagen.File(args.AV_FILE)
   file_ext = args.AV_FILE.rsplit('.', 1)[-1]
   
   if file_ext.lower() == 'mp3':
      handle_id3(av_file)
   elif file_ext.lower() == 'mp4':
      handle_mp4(av_file)

现在,我们需要使用两个句柄,一个用于从 MP3 中提取数据,另一个用于从 MP4 文件中提取数据。我们可以如下定义这些句柄:

def handle_id3(id3_file):
   id3_frames = {'TIT2': 'Title', 'TPE1': 'Artist', 'TALB': 'Album','TXXX':
      'Custom', 'TCON': 'Content Type', 'TDRL': 'Date released','COMM': 'Comments',
         'TDRC': 'Recording Date'}
   print("{:15} | {:15} | {:38} | {}".format("Frame", "Description","Text","Value"))
   print("-" * 85)
   
   for frames in id3_file.tags.values():
      frame_name = id3_frames.get(frames.FrameID, frames.FrameID)
      desc = getattr(frames, 'desc', "N/A")
      text = getattr(frames, 'text', ["N/A"])[0]
      value = getattr(frames, 'value', "N/A")
      
      if "date" in frame_name.lower():
         text = str(text)
      print("{:15} | {:15} | {:38} | {}".format(
         frame_name, desc, text, value))
def handle_mp4(mp4_file):
   cp_sym = u"\u00A9"
   qt_tag = {
      cp_sym + 'nam': 'Title', cp_sym + 'art': 'Artist',
      cp_sym + 'alb': 'Album', cp_sym + 'gen': 'Genre',
      'cpil': 'Compilation', cp_sym + 'day': 'Creation Date',
      'cnID': 'Apple Store Content ID', 'atID': 'Album Title ID',
      'plID': 'Playlist ID', 'geID': 'Genre ID', 'pcst': 'Podcast',
      'purl': 'Podcast URL', 'egid': 'Episode Global ID',
      'cmID': 'Camera ID', 'sfID': 'Apple Store Country',
      'desc': 'Description', 'ldes': 'Long Description'}
genre_ids = json.load(open('apple_genres.json'))

现在,我们需要如下遍历此 MP4 文件:

print("{:22} | {}".format('Name', 'Value'))
print("-" * 40)

for name, value in mp4_file.tags.items():
   tag_name = qt_tag.get(name, name)
   
   if isinstance(value, list):
      value = "; ".join([str(x) for x in value])
   if name == 'geID':
      value = "{}: {}".format(
      value, genre_ids[str(value)].replace("|", " - "))
   print("{:22} | {}".format(tag_name, value))

上述脚本将为我们提供有关 MP3 和 MP4 文件的其他信息。

图像

图像可能包含不同类型的元数据,具体取决于其文件格式。但是,大多数图像都嵌入 GPS 信息。我们可以使用第三方 Python 库提取此 GPS 信息。您可以使用以下 Python 脚本执行此操作:

首先,如下下载名为 **Python Imaging Library (PIL)** 的第三方 Python 库:

pip install pillow

这将有助于我们从图像中提取元数据。

我们还可以将嵌入在图像中的 GPS 详细信息写入 KML 文件,但为此我们需要下载名为 **simplekml** 的第三方 Python 库,如下所示:

pip install simplekml

在此脚本中,首先需要导入以下库:

from __future__ import print_function
import argparse

from PIL import Image
from PIL.ExifTags import TAGS

import simplekml
import sys

现在,命令行处理程序将接受一个位置参数,该参数基本上表示照片的文件路径。

parser = argparse.ArgumentParser('Metadata from images')
parser.add_argument('PICTURE_FILE', help = "Path to picture")
args = parser.parse_args()

现在,我们需要指定将填充坐标信息的 URL。这些 URL 是 **gmaps** 和 **open_maps**。我们还需要一个函数来将 PIL 库提供的度分秒 (DMS) 元组坐标转换为十进制。可以按如下方式执行:

gmaps = "https://www.google.com/maps?q={},{}"
open_maps = "http://www.openstreetmap.org/?mlat={}&mlon={}"

def process_coords(coord):
   coord_deg = 0
   
   for count, values in enumerate(coord):
      coord_deg += (float(values[0]) / values[1]) / 60**count
   return coord_deg

现在,我们将使用 **image.open()** 函数将文件作为 PIL 对象打开。

img_file = Image.open(args.PICTURE_FILE)
exif_data = img_file._getexif()

if exif_data is None:
   print("No EXIF data found")
   sys.exit()
for name, value in exif_data.items():
   gps_tag = TAGS.get(name, name)
   if gps_tag is not 'GPSInfo':
      continue

找到 **GPSInfo** 标签后,我们将存储 GPS 参考并使用 **process_coords()** 方法处理坐标。

lat_ref = value[1] == u'N'
lat = process_coords(value[2])

if not lat_ref:
   lat = lat * -1
lon_ref = value[3] == u'E'
lon = process_coords(value[4])

if not lon_ref:
   lon = lon * -1

现在,从 **simplekml** 库初始化 **kml** 对象,如下所示:

kml = simplekml.Kml()
kml.newpoint(name = args.PICTURE_FILE, coords = [(lon, lat)])
kml.save(args.PICTURE_FILE + ".kml")

现在我们可以从处理后的信息中打印坐标,如下所示:

print("GPS Coordinates: {}, {}".format(lat, lon))
print("Google Maps URL: {}".format(gmaps.format(lat, lon)))
print("OpenStreetMap URL: {}".format(open_maps.format(lat, lon)))
print("KML File {} created".format(args.PICTURE_FILE + ".kml"))

PDF 文档

PDF 文档包含各种媒体,包括图像、文本、表单等。当我们提取 PDF 文档中嵌入的元数据时,我们可能会以称为可扩展元数据平台 (XMP) 的格式获得结果数据。我们可以借助以下 Python 代码提取元数据:

首先,安装名为 **PyPDF2** 的第三方 Python 库以读取 XMP 格式存储的元数据。可以按如下方式安装:

pip install PyPDF2

现在,导入以下库以从 PDF 文件中提取元数据:

from __future__ import print_function
from argparse import ArgumentParser, FileType

import datetime
from PyPDF2 import PdfFileReader
import sys

现在,命令行处理程序将接受一个位置参数,该参数基本上表示 PDF 文件的文件路径。

parser = argparse.ArgumentParser('Metadata from PDF')
parser.add_argument('PDF_FILE', help='Path to PDF file',type=FileType('rb'))
args = parser.parse_args()

现在,我们可以使用 **getXmpMetadata()** 方法提供一个包含可用元数据的对象,如下所示:

pdf_file = PdfFileReader(args.PDF_FILE)
xmpm = pdf_file.getXmpMetadata()

if xmpm is None:
   print("No XMP metadata found in document.")
   sys.exit()

我们可以使用 **custom_print()** 方法提取并打印相关值,例如标题、创建者、贡献者等,如下所示:

custom_print("Title: {}", xmpm.dc_title)
custom_print("Creator(s): {}", xmpm.dc_creator)
custom_print("Contributors: {}", xmpm.dc_contributor)
custom_print("Subject: {}", xmpm.dc_subject)
custom_print("Description: {}", xmpm.dc_description)
custom_print("Created: {}", xmpm.xmp_createDate)
custom_print("Modified: {}", xmpm.xmp_modifyDate)
custom_print("Event Dates: {}", xmpm.dc_date)

如果 PDF 是使用多个软件创建的,我们也可以定义 **custom_print()** 方法,如下所示:

def custom_print(fmt_str, value):
   if isinstance(value, list):
      print(fmt_str.format(", ".join(value)))
   elif isinstance(value, dict):
      fmt_value = [":".join((k, v)) for k, v in value.items()]
      print(fmt_str.format(", ".join(value)))
   elif isinstance(value, str) or isinstance(value, bool):
      print(fmt_str.format(value))
   elif isinstance(value, bytes):
      print(fmt_str.format(value.decode()))
   elif isinstance(value, datetime.datetime):
      print(fmt_str.format(value.isoformat()))
   elif value is None:
      print(fmt_str.format("N/A"))
   else:
      print("warn: unhandled type {} found".format(type(value)))

我们还可以提取软件保存的任何其他自定义属性,如下所示:

if xmpm.custom_properties:
   print("Custom Properties:")
   
   for k, v in xmpm.custom_properties.items():
      print("\t{}: {}".format(k, v))

上述脚本将读取 PDF 文档,并打印以 XMP 格式存储的元数据,包括软件存储的一些自定义属性,借助这些属性创建了该 PDF。

Windows 可执行文件

有时我们可能会遇到可疑或未经授权的可执行文件。但出于调查目的,它可能很有用,因为其中嵌入了元数据。我们可以获取诸如其位置、用途以及其他属性(例如制造商、编译日期等)的信息。借助以下 Python 脚本,我们可以获取编译日期、标题中的有用数据以及导入和导出的符号。

为此,首先安装第三方 Python 库 **pefile**。可以按如下方式执行:

pip install pefile

成功安装后,如下导入以下库:

from __future__ import print_function

import argparse
from datetime import datetime
from pefile import PE

现在,命令行处理程序将接受一个位置参数,该参数基本上表示可执行文件的文件路径。您还可以选择输出样式,是需要详细和详细的方式还是简化方式。为此,您需要给出如下所示的可选参数:

parser = argparse.ArgumentParser('Metadata from executable file')
parser.add_argument("EXE_FILE", help = "Path to exe file")
parser.add_argument("-v", "--verbose", help = "Increase verbosity of output",
action = 'store_true', default = False)
args = parser.parse_args()

现在,我们将使用 PE 类加载输入可执行文件。我们还将使用 **dump_dict()** 方法将可执行数据转储到字典对象中。

pe = PE(args.EXE_FILE)
ped = pe.dump_dict()

我们可以使用下面显示的代码提取基本文件元数据,例如嵌入的作者身份、版本和编译时间。

file_info = {}
for structure in pe.FileInfo:
   if structure.Key == b'StringFileInfo':
      for s_table in structure.StringTable:
         for key, value in s_table.entries.items():
            if value is None or len(value) == 0:
               value = "Unknown"
            file_info[key] = value
print("File Information: ")
print("==================")

for k, v in file_info.items():
   if isinstance(k, bytes):
      k = k.decode()
   if isinstance(v, bytes):
      v = v.decode()
   print("{}: {}".format(k, v))
comp_time = ped['FILE_HEADER']['TimeDateStamp']['Value']
comp_time = comp_time.split("[")[-1].strip("]")
time_stamp, timezone = comp_time.rsplit(" ", 1)
comp_time = datetime.strptime(time_stamp, "%a %b %d %H:%M:%S %Y")
print("Compiled on {} {}".format(comp_time, timezone.strip()))

我们可以从标题中提取有用的数据,如下所示:

for section in ped['PE Sections']:
   print("Section '{}' at {}: {}/{} {}".format(
      section['Name']['Value'], hex(section['VirtualAddress']['Value']),
      section['Misc_VirtualSize']['Value'],
      section['SizeOfRawData']['Value'], section['MD5'])
   )

现在,从可执行文件中提取导入和导出的列表,如下所示:

if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
   print("\nImports: ")
   print("=========")
   
   for dir_entry in pe.DIRECTORY_ENTRY_IMPORT:
      dll = dir_entry.dll
      
      if not args.verbose:
         print(dll.decode(), end=", ")
         continue
      name_list = []
      
      for impts in dir_entry.imports:
         if getattr(impts, "name", b"Unknown") is None:
            name = b"Unknown"
         else:
            name = getattr(impts, "name", b"Unknown")
			name_list.append([name.decode(), hex(impts.address)])
      name_fmt = ["{} ({})".format(x[0], x[1]) for x in name_list]
      print('- {}: {}'.format(dll.decode(), ", ".join(name_fmt)))
   if not args.verbose:
      print()

现在,使用如下所示的代码打印 **exports**、**names** 和 **addresses**:

if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
   print("\nExports: ")
   print("=========")
   
   for sym in pe.DIRECTORY_ENTRY_EXPORT.symbols:
      print('- {}: {}'.format(sym.name.decode(), hex(sym.address)))

上述脚本将从 Windows 可执行文件中提取基本元数据、标题信息。

Office 文档元数据

大多数计算机工作都通过 MS Office 的三个应用程序完成——Word、PowerPoint 和 Excel。这些文件拥有大量的元数据,可以揭示有关其作者身份和历史的有趣信息。

请注意,Word(.docx)、Excel(.xlsx)和 PowerPoint(.pptx)的 2007 格式的元数据存储在 XML 文件中。我们可以使用以下 Python 脚本在 Python 中处理这些 XML 文件:

首先,导入如下所示的必需库:

from __future__ import print_function
from argparse import ArgumentParser
from datetime import datetime as dt
from xml.etree import ElementTree as etree

import zipfile
parser = argparse.ArgumentParser('Office Document Metadata’)
parser.add_argument("Office_File", help="Path to office file to read")
args = parser.parse_args()

现在,检查文件是否为 ZIP 文件。否则,引发错误。现在,打开文件并提取用于处理的关键元素,使用以下代码:

zipfile.is_zipfile(args.Office_File)
zfile = zipfile.ZipFile(args.Office_File)
core_xml = etree.fromstring(zfile.read('docProps/core.xml'))
app_xml = etree.fromstring(zfile.read('docProps/app.xml'))

现在,创建一个字典来初始化元数据提取:

core_mapping = {
   'title': 'Title',
   'subject': 'Subject',
   'creator': 'Author(s)',
   'keywords': 'Keywords',
   'description': 'Description',
   'lastModifiedBy': 'Last Modified By',
   'modified': 'Modified Date',
   'created': 'Created Date',
   'category': 'Category',
   'contentStatus': 'Status',
   'revision': 'Revision'
}

使用 **iterchildren()** 方法访问 XML 文件中的每个标签:

for element in core_xml.getchildren():
   for key, title in core_mapping.items():
      if key in element.tag:
         if 'date' in title.lower():
            text = dt.strptime(element.text, "%Y-%m-%dT%H:%M:%SZ")
         else:
            text = element.text
         print("{}: {}".format(title, text))

同样,对包含文档内容统计信息的 app.xml 文件执行此操作:

app_mapping = {
   'TotalTime': 'Edit Time (minutes)',
   'Pages': 'Page Count',
   'Words': 'Word Count',
   'Characters': 'Character Count',
   'Lines': 'Line Count',
   'Paragraphs': 'Paragraph Count',
   'Company': 'Company',
   'HyperlinkBase': 'Hyperlink Base',
   'Slides': 'Slide count',
   'Notes': 'Note Count',
   'HiddenSlides': 'Hidden Slide Count',
}
for element in app_xml.getchildren():
   for key, title in app_mapping.items():
      if key in element.tag:
         if 'date' in title.lower():
            text = dt.strptime(element.text, "%Y-%m-%dT%H:%M:%SZ")
         else:
            text = element.text
         print("{}: {}".format(title, text))

现在,在运行上述脚本后,我们可以获得有关特定文档的不同详细信息。请注意,我们只能对 Office 2007 或更高版本的文档应用此脚本。

Python 数字网络取证-I

本章将解释使用 Python 执行网络取证涉及的基本原理。

了解网络取证

网络取证是数字取证的一个分支,它处理对计算机网络流量(本地和广域网 (WAN))的监控和分析,以收集信息、收集证据或入侵检测为目的。网络取证在调查诸如知识产权盗窃或信息泄露等数字犯罪方面发挥着至关重要的作用。网络通信的画面帮助调查人员解决一些关键问题,如下所示:

  • 访问了哪些网站?

  • 在我们的网络上上传了哪种内容?

  • 从我们的网络下载了哪种内容?

  • 正在访问哪些服务器?

  • 是否有人将敏感信息发送到公司防火墙之外?

Internet Evidence Finder (IEF)

IEF 是一种数字取证工具,用于查找、分析和呈现在不同数字媒体(如计算机、智能手机、平板电脑等)上发现的数字证据。它非常流行,被数千名取证专业人员使用。

IEF 的用途

由于其受欢迎程度,IEF 在很大程度上被取证专业人员使用。IEF 的一些用途如下:

  • 由于其强大的搜索功能,它被用于同时搜索多个文件或数据媒体。

  • 它还用于通过新的雕刻技术从 RAM 的未分配空间中恢复已删除的数据。

  • 如果调查人员希望在他们打开的日期以其原始格式重建网页,则可以使用 IEF。

  • 它还用于搜索逻辑或物理磁盘卷。

使用 Python 将 IEF 的报告转储到 CSV

IEF 将数据存储在 SQLite 数据库中,以下 Python 脚本将动态识别 IEF 数据库中的结果表并将它们转储到各自的 CSV 文件中。

此过程按以下步骤完成

  • 首先,生成 IEF 结果数据库,该数据库将是一个以 .db 为扩展名的 SQLite 数据库文件。

  • 然后,查询该数据库以识别所有表。

  • 最后,将这些结果表写入单个 CSV 文件。

Python 代码

让我们看看如何为此目的使用 Python 代码:

对于 Python 脚本,如下导入必要的库:

from __future__ import print_function

import argparse
import csv
import os
import sqlite3
import sys

现在,我们需要提供 IEF 数据库文件的路径:

if __name__ == '__main__':
   parser = argparse.ArgumentParser('IEF to CSV')
   parser.add_argument("IEF_DATABASE", help="Input IEF database")
   parser.add_argument("OUTPUT_DIR", help="Output DIR")
   args = parser.parse_args()

现在,我们将确认 IEF 数据库是否存在,如下所示:

if not os.path.exists(args.OUTPUT_DIR):
   os.makedirs(args.OUTPUT_DIR)
if os.path.exists(args.IEF_DATABASE) and \ os.path.isfile(args.IEF_DATABASE):
   main(args.IEF_DATABASE, args.OUTPUT_DIR)
else:
   print("[-] Supplied input file {} does not exist or is not a " "file".format(args.IEF_DATABASE))
   sys.exit(1)

现在,正如我们在之前的脚本中所做的那样,如下与 SQLite 数据库建立连接,以便通过游标执行查询:

def main(database, out_directory):
   print("[+] Connecting to SQLite database")
   conn = sqlite3.connect(database)
   c = conn.cursor()

以下代码行将从数据库中获取表的名称:

print("List of all tables to extract")
c.execute("select * from sqlite_master where type = 'table'")
tables = [x[2] for x in c.fetchall() if not x[2].startswith('_') and not x[2].endswith('_DATA')]

现在,我们将从表中选择所有数据,并使用游标对象上的 **fetchall()** 方法将包含表数据的元组列表完整地存储在一个变量中:

print("Dumping {} tables to CSV files in {}".format(len(tables), out_directory))

for table in tables:
c.execute("pragma table_info('{}')".format(table))
table_columns = [x[1] for x in c.fetchall()]

c.execute("select * from '{}'".format(table))
table_data = c.fetchall()

现在,使用 **CSV_Writer()** 方法,我们将内容写入 CSV 文件:

csv_name = table + '.csv'
csv_path = os.path.join(out_directory, csv_name)
print('[+] Writing {} table to {} CSV file'.format(table,csv_name))

with open(csv_path, "w", newline = "") as csvfile:
   csv_writer = csv.writer(csvfile)
   csv_writer.writerow(table_columns)
   csv_writer.writerows(table_data)

上述脚本将从 IEF 数据库的表中获取所有数据,并将内容写入我们选择的 CSV 文件。

使用缓存数据

从 IEF 结果数据库中,我们可以获取更多 IEF 本身不一定支持的信息。我们可以通过使用 IEF 结果数据库来获取缓存数据,这是来自 Yahoo、Google 等电子邮件服务提供商的信息副产品。

以下是使用 IEF 数据库访问在 Google Chrome 上访问的 Yahoo 邮件的缓存数据信息的 Python 脚本。请注意,步骤与上一个 Python 脚本中的步骤大致相同。

首先,如下导入 Python 的必要库:

from __future__ import print_function
import argparse
import csv
import os
import sqlite3
import sys
import json

现在,提供 IEF 数据库文件的路径以及命令行处理程序接受的两个位置参数,如上一个脚本中所做的那样:

if __name__ == '__main__':
   parser = argparse.ArgumentParser('IEF to CSV')
   parser.add_argument("IEF_DATABASE", help="Input IEF database")
   parser.add_argument("OUTPUT_DIR", help="Output DIR")
   args = parser.parse_args()

现在,确认 IEF 数据库是否存在,如下所示:

directory = os.path.dirname(args.OUTPUT_CSV)

if not os.path.exists(directory):os.makedirs(directory)
if os.path.exists(args.IEF_DATABASE) and \ os.path.isfile(args.IEF_DATABASE):
   main(args.IEF_DATABASE, args.OUTPUT_CSV)
   else: print("Supplied input file {} does not exist or is not a " "file".format(args.IEF_DATABASE))
sys.exit(1)

现在,如下与 SQLite 数据库建立连接,以便通过游标执行查询:

def main(database, out_csv):
   print("[+] Connecting to SQLite database")
   conn = sqlite3.connect(database)
   c = conn.cursor()

您可以使用以下代码行来获取 Yahoo Mail 联系人缓存记录的实例:

print("Querying IEF database for Yahoo Contact Fragments from " "the Chrome Cache Records Table")
   try:
      c.execute("select * from 'Chrome Cache Records' where URL like " "'https://data.mail.yahoo.com" "/classicab/v2/contacts/?format=json%'")
   except sqlite3.OperationalError:
      print("Received an error querying the database --    database may be" "corrupt or not have a Chrome Cache Records table")
      sys.exit(2)

现在,将上述查询返回的元组列表保存到一个变量中,如下所示:

contact_cache = c.fetchall()
contact_data = process_contacts(contact_cache)
write_csv(contact_data, out_csv)

请注意,这里我们将使用两种方法,即 **process_contacts()** 用于设置结果列表以及遍历每个联系人缓存记录,以及 **json.loads()** 将从表中提取的 JSON 数据存储到一个变量中以供进一步操作:

def process_contacts(contact_cache):
   print("[+] Processing {} cache files matching Yahoo contact cache " " data".format(len(contact_cache)))
   results = []
   
   for contact in contact_cache:
      url = contact[0]
      first_visit = contact[1]
      last_visit = contact[2]
      last_sync = contact[3]
      loc = contact[8]
	   contact_json = json.loads(contact[7].decode())
      total_contacts = contact_json["total"]
      total_count = contact_json["count"]
      
      if "contacts" not in contact_json:
         continue
      for c in contact_json["contacts"]:
         name, anni, bday, emails, phones, links = ("", "", "", "", "", "")
            if "name" in c:
            name = c["name"]["givenName"] + " " + \ c["name"]["middleName"] + " " + c["name"]["familyName"]
            
            if "anniversary" in c:
            anni = c["anniversary"]["month"] + \"/" + c["anniversary"]["day"] + "/" + \c["anniversary"]["year"]
            
            if "birthday" in c:
            bday = c["birthday"]["month"] + "/" + \c["birthday"]["day"] + "/" + c["birthday"]["year"]
            
            if "emails" in c:
               emails = ', '.join([x["ep"] for x in c["emails"]])
            
            if "phones" in c:
               phones = ', '.join([x["ep"] for x in c["phones"]])
            
            if "links" in c:
              links = ', '.join([x["ep"] for x in c["links"]])

现在,对于公司、职位和注释,使用 get 方法,如下所示:

company = c.get("company", "")
title = c.get("jobTitle", "")
notes = c.get("notes", "")

现在,我们将元数据和提取的数据元素列表附加到结果列表中,如下所示:

results.append([url, first_visit, last_visit, last_sync, loc, name, bday,anni, emails, phones, links, company, title, notes,total_contacts, total_count])
return results   

现在,使用 **CSV_Writer()** 方法,我们将内容写入 CSV 文件:

def write_csv(data, output):
   print("[+] Writing {} contacts to {}".format(len(data), output))
   with open(output, "w", newline="") as csvfile:
      csv_writer = csv.writer(csvfile)
      csv_writer.writerow([
         "URL", "First Visit (UTC)", "Last Visit (UTC)",
         "Last Sync (UTC)", "Location", "Contact Name", "Bday",
         "Anniversary", "Emails", "Phones", "Links", "Company", "Title",
         "Notes", "Total Contacts", "Count of Contacts in Cache"])
      csv_writer.writerows(data)  

借助上述脚本,我们可以使用 IEF 数据库处理 Yahoo 邮件的缓存数据。

Python 数字网络取证-II

上一章讨论了使用 Python 进行网络取证的一些概念。在本章中,让我们更深入地了解使用 Python 进行网络取证。

使用 Beautiful Soup 保留网页

万维网 (WWW) 是一个独特的资源信息库。然而,由于内容丢失的速度惊人,其遗产面临着巨大的风险。许多文化遗产和学术机构、非营利组织和私营企业都探索了相关问题,并为网络存档的技术解决方案的开发做出了贡献。

网页保留或网络存档是从万维网收集数据,确保数据保存在存档中,并使其可供未来的研究人员、历史学家和公众使用。在进一步深入网页保留之前,让我们先讨论一些与网页保留相关的重要问题,如下所示:

  • 网络资源的变化 - 网络资源每天都在变化,这对网页保留提出了挑战。

  • 大量资源 - 网页保留相关的另一个问题是需要保留的大量资源。

  • 完整性 - 必须保护网页免受未经授权的修改、删除或移除,以保护其完整性。

  • 处理多媒体数据 - 保留网页时,我们还需要处理多媒体数据,这可能会导致一些问题。

  • 提供访问权限 - 除了保留之外,还需要解决提供对网络资源的访问权限以及处理所有权问题。

在本章中,我们将使用名为Beautiful Soup的 Python 库来进行网页保留。

什么是 Beautiful Soup?

Beautiful Soup 是一个 Python 库,用于从 HTML 和 XML 文件中提取数据。它可以与urlib一起使用,因为它需要一个输入(文档或 URL)来创建 soup 对象,因为它本身无法获取网页。您可以在www.crummy.com/software/BeautifulSoup/bs4/doc/详细了解此内容。

请注意,在使用它之前,我们必须使用以下命令安装第三方库:

pip install bs4

接下来,使用 Anaconda 包管理器,我们可以如下安装 Beautiful Soup:

conda install -c anaconda beautifulsoup4

用于保留网页的 Python 脚本

这里讨论了使用名为 Beautiful Soup 的第三方库来保留网页的 Python 脚本:

首先,导入所需的库,如下所示:

from __future__ import print_function
import argparse

from bs4 import BeautifulSoup, SoupStrainer
from datetime import datetime

import hashlib
import logging
import os
import ssl
import sys
from urllib.request import urlopen

import urllib.error
logger = logging.getLogger(__name__)

请注意,此脚本将接受两个位置参数,一个是需要保留的 URL,另一个是所需的输出目录,如下所示:

if __name__ == "__main__":
   parser = argparse.ArgumentParser('Web Page preservation')
   parser.add_argument("DOMAIN", help="Website Domain")
   parser.add_argument("OUTPUT_DIR", help="Preservation Output Directory")
   parser.add_argument("-l", help="Log file path",
   default=__file__[:-3] + ".log")
   args = parser.parse_args()

现在,通过指定一个文件和流处理程序来设置脚本的日志记录,以便循环并记录获取过程,如下所示:

logger.setLevel(logging.DEBUG)
msg_fmt = logging.Formatter("%(asctime)-15s %(funcName)-10s""%(levelname)-8s %(message)s")
strhndl = logging.StreamHandler(sys.stderr)
strhndl.setFormatter(fmt=msg_fmt)
fhndl = logging.FileHandler(args.l, mode='a')
fhndl.setFormatter(fmt=msg_fmt)

logger.addHandler(strhndl)
logger.addHandler(fhndl)
logger.info("Starting BS Preservation")
logger.debug("Supplied arguments: {}".format(sys.argv[1:]))
logger.debug("System " + sys.platform)
logger.debug("Version " + sys.version)

现在,让我们对所需的输出目录进行输入验证,如下所示:

if not os.path.exists(args.OUTPUT_DIR):
   os.makedirs(args.OUTPUT_DIR)
main(args.DOMAIN, args.OUTPUT_DIR)

现在,我们将定义main()函数,该函数将在提取网站的基本名称之前删除实际名称之前的多余元素,并对输入 URL 进行额外验证,如下所示:

def main(website, output_dir):
   base_name = website.replace("https://", "").replace("http://", "").replace("www.", "")
   link_queue = set()
   
   if "http://" not in website and "https://" not in website:
      logger.error("Exiting preservation - invalid user input: {}".format(website))
      sys.exit(1)
   logger.info("Accessing {} webpage".format(website))
   context = ssl._create_unverified_context()

现在,我们需要使用 urlopen() 方法打开与 URL 的连接。让我们使用 try-except 块,如下所示:

try:
   index = urlopen(website, context=context).read().decode("utf-8")
except urllib.error.HTTPError as e:
   logger.error("Exiting preservation - unable to access page: {}".format(website))
   sys.exit(2)
logger.debug("Successfully accessed {}".format(website))

接下来的几行代码包含三个函数,如下所述:

  • write_output() 将第一个网页写入输出目录

  • find_links() 函数用于识别此网页上的链接

  • recurse_pages() 函数用于迭代并发现网页上的所有链接。

write_output(website, index, output_dir)
link_queue = find_links(base_name, index, link_queue)
logger.info("Found {} initial links on webpage".format(len(link_queue)))
recurse_pages(website, link_queue, context, output_dir)
logger.info("Completed preservation of {}".format(website))

现在,让我们定义write_output()方法,如下所示:

def write_output(name, data, output_dir, counter=0):
   name = name.replace("http://", "").replace("https://", "").rstrip("//")
   directory = os.path.join(output_dir, os.path.dirname(name))
   
   if not os.path.exists(directory) and os.path.dirname(name) != "":
      os.makedirs(directory)

我们需要记录有关网页的一些详细信息,然后使用hash_data()方法记录数据的哈希值,如下所示:

logger.debug("Writing {} to {}".format(name, output_dir)) logger.debug("Data Hash: {}".format(hash_data(data)))
path = os.path.join(output_dir, name)
path = path + "_" + str(counter)
with open(path, "w") as outfile:
   outfile.write(data)
logger.debug("Output File Hash: {}".format(hash_file(path)))

现在,定义hash_data()方法,借助该方法,我们读取UTF-8编码的数据,然后生成其SHA-256哈希值,如下所示:

def hash_data(data):
   sha256 = hashlib.sha256()
   sha256.update(data.encode("utf-8"))
   return sha256.hexdigest()
def hash_file(file):
   sha256 = hashlib.sha256()
   with open(file, "rb") as in_file:
      sha256.update(in_file.read())
return sha256.hexdigest()

现在,让我们在find_links()方法中从网页数据中创建一个Beautifulsoup对象,如下所示:

def find_links(website, page, queue):
   for link in BeautifulSoup(page, "html.parser",parse_only = SoupStrainer("a", href = True)):
      if website in link.get("href"):
         if not os.path.basename(link.get("href")).startswith("#"):
            queue.add(link.get("href"))
   return queue

现在,我们需要定义recurse_pages()方法,并为其提供网站 URL、当前链接队列、未经验证的 SSL 上下文和输出目录的输入,如下所示:

def recurse_pages(website, queue, context, output_dir):
   processed = []
   counter = 0
   
   while True:
      counter += 1
      if len(processed) == len(queue):
         break
      for link in queue.copy(): if link in processed:
         continue
	   processed.append(link)
      try:
      page = urlopen(link,      context=context).read().decode("utf-8")
      except urllib.error.HTTPError as e:
         msg = "Error accessing webpage: {}".format(link)
         logger.error(msg)
         continue

现在,通过传递链接名称、页面数据、输出目录和计数器,将每个访问的网页的输出写入文件,如下所示:

write_output(link, page, output_dir, counter)
queue = find_links(website, page, queue)
logger.info("Identified {} links throughout website".format(
   len(queue)))

现在,当我们通过提供网站的 URL、输出目录和日志文件的路径来运行此脚本时,我们将获得有关该网页的详细信息,这些信息可用于将来使用。

病毒狩猎

您是否曾经想过取证分析师、安全研究人员和事件响应人员如何能够理解有用软件和恶意软件之间的区别?答案就在问题本身,因为如果不研究黑客快速生成的恶意软件,研究人员和专家就很难区分有用软件和恶意软件。在本节中,让我们讨论VirusShare,这是一个完成此任务的工具。

了解 VirusShare

VirusShare 是最大的私有恶意软件样本集合,为安全研究人员、事件响应人员和取证分析师提供活动恶意代码的样本。它包含超过 3000 万个样本。

VirusShare 的优势在于其免费提供的恶意软件哈希列表。任何人都可以使用这些哈希值来创建一个非常全面的哈希集,并用它来识别潜在的恶意文件。但在使用 VirusShare 之前,我们建议您访问https://virusshare.com以了解更多详细信息。

使用 Python 从 VirusShare 创建换行符分隔的哈希列表

来自 VirusShare 的哈希列表可用于各种取证工具,例如 X-ways 和 EnCase。在下面讨论的脚本中,我们将自动化从 VirusShare 下载哈希列表的过程,以创建换行符分隔的哈希列表。

对于此脚本,我们需要一个名为tqdm的第三方 Python 库,可以如下下载:

pip install tqdm

请注意,在此脚本中,我们首先将读取 VirusShare 哈希页面并动态识别最新的哈希列表。然后,我们将初始化进度条并在所需的范围内下载哈希列表。

首先,导入以下库:

from __future__ import print_function

import argparse
import os
import ssl
import sys
import tqdm

from urllib.request import urlopen
import urllib.error

此脚本将接受一个位置参数,该参数将是哈希集所需的路径:

if __name__ == '__main__':
   parser = argparse.ArgumentParser('Hash set from VirusShare')
   parser.add_argument("OUTPUT_HASH", help = "Output Hashset")
   parser.add_argument("--start", type = int, help = "Optional starting location")
   args = parser.parse_args()

现在,我们将执行标准输入验证,如下所示:

directory = os.path.dirname(args.OUTPUT_HASH)
if not os.path.exists(directory):
   os.makedirs(directory)
if args.start:
   main(args.OUTPUT_HASH, start=args.start)
else:
   main(args.OUTPUT_HASH)

现在,我们需要定义main()函数,并使用**kwargs作为参数,因为这将创建一个字典,我们可以参考它来支持提供的键参数,如下所示:

def main(hashset, **kwargs):
   url = "https://virusshare.com/hashes.4n6"
   print("[+] Identifying hash set range from {}".format(url))
   context = ssl._create_unverified_context()

现在,我们需要使用urlib.request.urlopen()方法打开 VirusShare 哈希页面。我们将使用 try-except 块,如下所示:

try:
   index = urlopen(url, context = context).read().decode("utf-8")
except urllib.error.HTTPError as e:
   print("[-] Error accessing webpage - exiting..")
   sys.exit(1)

现在,从下载的页面中识别最新的哈希列表。您可以通过查找 HTML href标签到 VirusShare 哈希列表的最后一个实例来执行此操作。可以使用以下几行代码完成:

tag = index.rfind(r'a href = "hashes/VirusShare_')
stop = int(index[tag + 27: tag + 27 + 5].lstrip("0"))

if "start" not in kwa<rgs:
   start = 0
else:
   start = kwargs["start"]

if start < 0 or start > stop:
   print("[-] Supplied start argument must be greater than or equal ""to zero but less than the latest hash list, ""currently: {}".format(stop))
sys.exit(2)
print("[+] Creating a hashset from hash lists {} to {}".format(start, stop))
hashes_downloaded = 0

现在,我们将使用tqdm.trange()方法创建循环和进度条,如下所示:

for x in tqdm.trange(start, stop + 1, unit_scale=True,desc="Progress"):
   url_hash = "https://virusshare.com/hashes/VirusShare_"\"{}.md5".format(str(x).zfill(5))
   try:
      hashes = urlopen(url_hash, context=context).read().decode("utf-8")
      hashes_list = hashes.split("\n")
   except urllib.error.HTTPError as e:
      print("[-] Error accessing webpage for hash list {}"" - continuing..".format(x))
   continue

成功执行上述步骤后,我们将以 a+ 模式打开哈希集文本文件,以追加到文本文件的底部。

with open(hashset, "a+") as hashfile:
   for line in hashes_list:
   if not line.startswith("#") and line != "":
      hashes_downloaded += 1
      hashfile.write(line + '\n')
   print("[+] Finished downloading {} hashes into {}".format(
      hashes_downloaded, hashset))

运行上述脚本后,您将获得最新的哈希列表,其中包含以文本格式表示的 MD5 哈希值。

使用电子邮件进行调查

前面的章节讨论了网络取证的重要性、流程和相关概念。在本章中,让我们了解电子邮件在数字取证中的作用以及使用 Python 对其进行调查。

电子邮件在调查中的作用

电子邮件在商务沟通中发挥着非常重要的作用,并且已成为互联网上最重要的应用之一。它们是发送消息和文档的便捷方式,不仅可以通过计算机,还可以通过其他电子设备(如手机和平板电脑)发送。

电子邮件的负面影响是犯罪分子可能会泄露有关其公司的重要信息。因此,近年来,电子邮件在数字取证中的作用日益增强。在数字取证中,电子邮件被视为关键证据,电子邮件报头分析已成为在取证过程中收集证据的重要手段。

调查人员在执行电子邮件取证时具有以下目标:

  • 识别主要犯罪分子
  • 收集必要的证据
  • 展示调查结果
  • 构建案件

电子邮件取证中的挑战

电子邮件取证在调查中发挥着非常重要的作用,因为当今大多数通信都依赖于电子邮件。但是,电子邮件取证调查人员在调查过程中可能会遇到以下挑战:

虚假电子邮件

电子邮件取证中最大的挑战是使用虚假电子邮件,这些电子邮件是通过操纵和编写脚本报头等创建的。在此类别中,犯罪分子还会使用临时电子邮件,这是一种允许注册用户在特定时间段后过期的临时地址接收电子邮件的服务。

欺骗

电子邮件取证中的另一个挑战是欺骗,其中犯罪分子习惯于将电子邮件伪装成他人的电子邮件。在这种情况下,机器将同时接收虚假和原始 IP 地址。

匿名转发电子邮件

在此,电子邮件服务器在转发电子邮件之前会去除电子邮件消息中的识别信息。这给电子邮件调查带来了另一个重大挑战。

电子邮件取证调查中使用的技术

电子邮件取证是对电子邮件来源和内容的研究,作为证据来识别消息的实际发件人和收件人,以及一些其他信息,例如传输日期/时间和发件人的意图。它涉及调查元数据、端口扫描以及关键字搜索。

一些可用于电子邮件取证调查的常见技术包括

  • 报头分析
  • 服务器调查
  • 网络设备调查
  • 发件人邮件指纹
  • 软件嵌入式标识符

在以下各节中,我们将学习如何使用 Python 获取信息以进行电子邮件调查。

从 EML 文件中提取信息

EML 文件基本上是文件格式的电子邮件,广泛用于存储电子邮件消息。它们是跨多个电子邮件客户端(如 Microsoft Outlook、Outlook Express 和 Windows Live Mail)兼容的结构化文本文件。

EML 文件将电子邮件报头、正文内容、附件数据存储为纯文本。它使用 base64 编码二进制数据,并使用 Quoted-Printable (QP) 编码存储内容信息。下面给出了可用于从 EML 文件中提取信息的 Python 脚本:

首先,导入以下 Python 库,如下所示:

from __future__ import print_function
from argparse import ArgumentParser, FileType
from email import message_from_file

import os
import quopri
import base64

在上述库中,quopri用于解码来自 EML 文件的 QP 编码值。任何 base64 编码的数据都可以借助base64库进行解码。

接下来,让我们为命令行处理程序提供参数。请注意,这里它将只接受一个参数,即 EML 文件的路径,如下所示:

if __name__ == '__main__':
   parser = ArgumentParser('Extracting information from EML file')
   parser.add_argument("EML_FILE",help="Path to EML File", type=FileType('r'))
   args = parser.parse_args()
   main(args.EML_FILE)

现在,我们需要定义main()函数,在该函数中,我们将使用来自 email 库的名为message_from_file()的方法读取文件类对象。在这里,我们将通过使用名为emlfile的结果变量来访问报头、正文内容、附件和其他有效负载信息,如以下代码所示:

def main(input_file):
   emlfile = message_from_file(input_file)
   for key, value in emlfile._headers:
      print("{}: {}".format(key, value))
print("\nBody\n")

if emlfile.is_multipart():
   for part in emlfile.get_payload():
      process_payload(part)
else:
   process_payload(emlfile[1])

现在,我们需要在其中定义process_payload()方法,我们将使用get_payload()方法提取消息正文内容。我们将使用quopri.decodestring()函数解码QP编码的数据。我们还将检查内容的MIME类型,以便它可以正确地处理电子邮件的存储。请观察下面给出的代码 -

def process_payload(payload):
   print(payload.get_content_type() + "\n" + "=" * len(payload.get_content_type()))
   body = quopri.decodestring(payload.get_payload())
   
   if payload.get_charset():
      body = body.decode(payload.get_charset())
else:
   try:
      body = body.decode()
   except UnicodeDecodeError:
      body = body.decode('cp1252')

if payload.get_content_type() == "text/html":
   outfile = os.path.basename(args.EML_FILE.name) + ".html"
   open(outfile, 'w').write(body)
elif payload.get_content_type().startswith('application'):
   outfile = open(payload.get_filename(), 'wb')
   body = base64.b64decode(payload.get_payload())
   outfile.write(body)
   outfile.close()
   print("Exported: {}\n".format(outfile.name))
else:
   print(body)

执行上述脚本后,我们将在控制台上获得标题信息以及各种有效负载。

使用Python分析MSG文件

电子邮件有多种不同的格式。MSG是Microsoft Outlook和Exchange使用的一种格式。扩展名为MSG的文件可能包含标题和主消息正文的纯ASCII文本,以及超链接和附件。

在本节中,我们将学习如何使用Outlook API从MSG文件中提取信息。请注意,以下Python脚本仅在Windows上有效。为此,我们需要安装名为pywin32的第三方Python库,如下所示 -

pip install pywin32

现在,使用显示的命令导入以下库 -

from __future__ import print_function
from argparse import ArgumentParser

import os
import win32com.client
import pywintypes

现在,让我们为命令行处理程序提供一个参数。这里它将接受两个参数,一个是MSG文件的路径,另一个是所需的输出文件夹,如下所示 -

if __name__ == '__main__':
   parser = ArgumentParser(‘Extracting information from MSG file’)
   parser.add_argument("MSG_FILE", help="Path to MSG file")
   parser.add_argument("OUTPUT_DIR", help="Path to output folder")
   args = parser.parse_args()
   out_dir = args.OUTPUT_DIR
   
   if not os.path.exists(out_dir):
      os.makedirs(out_dir)
   main(args.MSG_FILE, args.OUTPUT_DIR)

现在,我们需要定义main()函数,在其中我们将调用win32com库来设置Outlook API,这进一步允许访问MAPI命名空间。

def main(msg_file, output_dir):
   mapi = win32com.client.Dispatch("Outlook.Application").GetNamespace("MAPI")
   msg = mapi.OpenSharedItem(os.path.abspath(args.MSG_FILE))
   
   display_msg_attribs(msg)
   display_msg_recipients(msg)
   
   extract_msg_body(msg, output_dir)
   extract_attachments(msg, output_dir)

现在,定义我们在该脚本中使用的不同函数。下面给出的代码显示了定义display_msg_attribs()函数,该函数允许我们显示消息的各种属性,例如主题、收件人、BCC、CC、大小、发件人姓名、发送时间等。

def display_msg_attribs(msg):
   attribs = [
      'Application', 'AutoForwarded', 'BCC', 'CC', 'Class',
      'ConversationID', 'ConversationTopic', 'CreationTime',
      'ExpiryTime', 'Importance', 'InternetCodePage', 'IsMarkedAsTask',
      'LastModificationTime', 'Links','ReceivedTime', 'ReminderSet',
      'ReminderTime', 'ReplyRecipientNames', 'Saved', 'Sender',
      'SenderEmailAddress', 'SenderEmailType', 'SenderName', 'Sent',
      'SentOn', 'SentOnBehalfOfName', 'Size', 'Subject',
      'TaskCompletedDate', 'TaskDueDate', 'To', 'UnRead'
   ]
   print("\nMessage Attributes")
   for entry in attribs:
      print("{}: {}".format(entry, getattr(msg, entry, 'N/A')))

现在,定义display_msg_recipeints()函数,该函数遍历消息并显示收件人详细信息。

def display_msg_recipients(msg):
   recipient_attrib = ['Address', 'AutoResponse', 'Name', 'Resolved', 'Sendable']
   i = 1
   
   while True:
   try:
      recipient = msg.Recipients(i)
   except pywintypes.com_error:
      break
   print("\nRecipient {}".format(i))
   print("=" * 15)
   
   for entry in recipient_attrib:
      print("{}: {}".format(entry, getattr(recipient, entry, 'N/A')))
   i += 1

接下来,我们定义extract_msg_body()函数,该函数从消息中提取正文内容、HTML以及纯文本。

def extract_msg_body(msg, out_dir):
   html_data = msg.HTMLBody.encode('cp1252')
   outfile = os.path.join(out_dir, os.path.basename(args.MSG_FILE))
   
   open(outfile + ".body.html", 'wb').write(html_data)
   print("Exported: {}".format(outfile + ".body.html"))
   body_data = msg.Body.encode('cp1252')
   
   open(outfile + ".body.txt", 'wb').write(body_data)
   print("Exported: {}".format(outfile + ".body.txt"))

接下来,我们将定义extract_attachments()函数,该函数将附件数据导出到所需的输出目录。

def extract_attachments(msg, out_dir):
   attachment_attribs = ['DisplayName', 'FileName', 'PathName', 'Position', 'Size']
   i = 1 # Attachments start at 1
   
   while True:
      try:
         attachment = msg.Attachments(i)
   except pywintypes.com_error:
      break

所有函数定义完成后,我们将使用以下代码行将所有属性打印到控制台 -

print("\nAttachment {}".format(i))
print("=" * 15)
   
for entry in attachment_attribs:
   print('{}: {}'.format(entry, getattr(attachment, entry,"N/A")))
outfile = os.path.join(os.path.abspath(out_dir),os.path.split(args.MSG_FILE)[-1])
   
if not os.path.exists(outfile):
os.makedirs(outfile)
outfile = os.path.join(outfile, attachment.FileName)
attachment.SaveAsFile(outfile)
   
print("Exported: {}".format(outfile))
i += 1

运行上述脚本后,我们将获得控制台窗口中消息及其附件的属性以及输出目录中的多个文件。

使用Python构建来自Google Takeout的MBOX文件

MBOX文件是具有特殊格式的文本文件,用于分割存储在其中的消息。它们通常与UNIX系统、Thunderbolt和Google Takeout相关联。

在本节中,您将看到一个Python脚本,我们将使用该脚本构建从Google Takeout获得的MBOX文件。但在那之前,我们必须知道如何使用我们的Google帐户或Gmail帐户生成这些MBOX文件。

获取Google帐户邮箱到MBX格式

获取Google帐户邮箱意味着备份我们的Gmail帐户。出于各种个人或职业原因,可以进行备份。请注意,Google提供了Gmail数据的备份。要将我们的Google帐户邮箱获取到MBOX格式,您需要按照以下步骤操作 -

  • 打开我的帐户仪表板。

  • 转到“个人信息和隐私”部分,然后选择“控制您的内容”链接。

  • 您可以创建新的存档或管理现有的存档。如果我们点击创建存档链接,那么我们将获得一些复选框,用于我们希望包含的每个Google产品。

  • 选择产品后,我们将能够选择文件类型和存档的最大大小,以及从列表中选择的传递方法。

  • 最后,我们将以MBOX格式获得此备份。

Python 代码

现在,上面讨论的MBOX文件可以使用Python构建,如下所示 -

首先,需要导入Python库,如下所示 -

from __future__ import print_function
from argparse import ArgumentParser

import mailbox
import os
import time
import csv
from tqdm import tqdm

import base64

除了用于解析MBOX文件的mailbox库外,所有库都已在之前的脚本中使用并进行了说明。

现在,为命令行处理程序提供一个参数。这里它将接受两个参数 - 一个是MBOX文件的路径,另一个是所需的输出文件夹。

if __name__ == '__main__':
   parser = ArgumentParser('Parsing MBOX files')
   parser.add_argument("MBOX", help="Path to mbox file")
   parser.add_argument(
      "OUTPUT_DIR",help = "Path to output directory to write report ""and exported content")
   args = parser.parse_args()
   main(args.MBOX, args.OUTPUT_DIR)

现在,将定义main()函数并调用mbox库的mbox类,借助该类,我们可以通过提供其路径来解析MBOX文件 -

def main(mbox_file, output_dir):
   print("Reading mbox file")
   mbox = mailbox.mbox(mbox_file, factory=custom_reader)
   print("{} messages to parse".format(len(mbox)))

现在,为mailbox库定义一个读取器方法,如下所示 -

def custom_reader(data_stream):
   data = data_stream.read()
   try:
      content = data.decode("ascii")
   except (UnicodeDecodeError, UnicodeEncodeError) as e:
      content = data.decode("cp1252", errors="replace")
   return mailbox.mboxMessage(content)

现在,为进一步处理创建一些变量,如下所示 -

parsed_data = []
attachments_dir = os.path.join(output_dir, "attachments")

if not os.path.exists(attachments_dir):
   os.makedirs(attachments_dir)
columns = [
   "Date", "From", "To", "Subject", "X-Gmail-Labels", "Return-Path", "Received", 
   "Content-Type", "Message-ID","X-GM-THRID", "num_attachments_exported", "export_path"]

接下来,使用tqdm生成进度条并跟踪迭代过程,如下所示 -

for message in tqdm(mbox):
   msg_data = dict()
   header_data = dict(message._headers)
for hdr in columns:
   msg_data[hdr] = header_data.get(hdr, "N/A")

现在,检查消息是否包含有效负载。如果包含,我们将定义write_payload()方法,如下所示 -

if len(message.get_payload()):
   export_path = write_payload(message, attachments_dir)
   msg_data['num_attachments_exported'] = len(export_path)
   msg_data['export_path'] = ", ".join(export_path)

现在,需要追加数据。然后我们将调用create_report()方法,如下所示 -

parsed_data.append(msg_data)
create_report(
   parsed_data, os.path.join(output_dir, "mbox_report.csv"), columns)
def write_payload(msg, out_dir):
   pyld = msg.get_payload()
   export_path = []
   
if msg.is_multipart():
   for entry in pyld:
      export_path += write_payload(entry, out_dir)
else:
   content_type = msg.get_content_type()
   if "application/" in content_type.lower():
      content = base64.b64decode(msg.get_payload())
      export_path.append(export_content(msg, out_dir, content))
   elif "image/" in content_type.lower():
      content = base64.b64decode(msg.get_payload())
      export_path.append(export_content(msg, out_dir, content))

   elif "video/" in content_type.lower():
      content = base64.b64decode(msg.get_payload())
      export_path.append(export_content(msg, out_dir, content))
   elif "audio/" in content_type.lower():
      content = base64.b64decode(msg.get_payload())
      export_path.append(export_content(msg, out_dir, content))
   elif "text/csv" in content_type.lower():
      content = base64.b64decode(msg.get_payload())
      export_path.append(export_content(msg, out_dir, content))
   elif "info/" in content_type.lower():
      export_path.append(export_content(msg, out_dir,
      msg.get_payload()))
   elif "text/calendar" in content_type.lower():
      export_path.append(export_content(msg, out_dir,
      msg.get_payload()))
   elif "text/rtf" in content_type.lower():
      export_path.append(export_content(msg, out_dir,
      msg.get_payload()))
   else:
      if "name=" in msg.get('Content-Disposition', "N/A"):
         content = base64.b64decode(msg.get_payload())
      export_path.append(export_content(msg, out_dir, content))
   elif "name=" in msg.get('Content-Type', "N/A"):
      content = base64.b64decode(msg.get_payload())
      export_path.append(export_content(msg, out_dir, content))
return export_path

请注意,上述if-else语句易于理解。现在,我们需要定义一个方法,该方法将从msg对象中提取文件名,如下所示 -

def export_content(msg, out_dir, content_data):
   file_name = get_filename(msg)
   file_ext = "FILE"
   
   if "." in file_name: file_ext = file_name.rsplit(".", 1)[-1]
   file_name = "{}_{:.4f}.{}".format(file_name.rsplit(".", 1)[0], time.time(), file_ext)
   file_name = os.path.join(out_dir, file_name)

现在,借助以下代码行,您实际上可以导出文件 -

if isinstance(content_data, str):
   open(file_name, 'w').write(content_data)
else:
   open(file_name, 'wb').write(content_data)
return file_name

现在,让我们定义一个函数来从message中提取文件名,以准确表示这些文件的名称,如下所示 -

def get_filename(msg):
   if 'name=' in msg.get("Content-Disposition", "N/A"):
      fname_data = msg["Content-Disposition"].replace("\r\n", " ")
      fname = [x for x in fname_data.split("; ") if 'name=' in x]
      file_name = fname[0].split("=", 1)[-1]
   elif 'name=' in msg.get("Content-Type", "N/A"):
      fname_data = msg["Content-Type"].replace("\r\n", " ")
      fname = [x for x in fname_data.split("; ") if 'name=' in x]
      file_name = fname[0].split("=", 1)[-1]
   else:
      file_name = "NO_FILENAME"
   fchars = [x for x in file_name if x.isalnum() or x.isspace() or x == "."]
   return "".join(fchars)

现在,我们可以通过定义create_report()函数来编写CSV文件,如下所示 -

def create_report(output_data, output_file, columns):
   with open(output_file, 'w', newline="") as outfile:
      csvfile = csv.DictWriter(outfile, columns)
      csvfile.writeheader()
      csvfile.writerows(output_data)

运行上述脚本后,我们将获得CSV报告和包含附件的目录。

Windows重要工件-I

本章将解释Microsoft Windows取证中涉及的各种概念以及调查人员可以从调查过程中获得的重要工件。

简介

工件是计算机系统中包含与计算机用户执行的活动相关的重要信息的对象或区域。此信息类型和位置取决于操作系统。在取证分析期间,这些工件在批准或否决调查人员的观察结果方面发挥着非常重要的作用。

Windows工件对取证的重要性

由于以下原因,Windows工件具有重要意义 -

  • 全世界约90%的流量来自使用Windows作为操作系统的计算机。因此,对于数字取证检查员来说,Windows工件非常重要。

  • Windows操作系统存储与用户在计算机系统上的活动相关的不同类型的证据。这是另一个表明Windows工件对数字取证的重要性。

  • 很多时候,调查人员会围绕用户创建的数据等旧的和传统的领域进行调查。Windows工件可以将调查引向非传统领域,例如系统创建的数据或工件。

  • Windows提供了大量的工件,这对调查人员以及执行非正式调查的公司和个人都有帮助。

  • 近年来网络犯罪的增加是Windows工件很重要的另一个原因。

Windows工件及其Python脚本

在本节中,我们将讨论一些Windows工件以及用于从中获取信息的Python脚本。

回收站

它是取证调查中重要的Windows工件之一。Windows回收站包含用户已删除但系统尚未物理删除的文件。即使用户完全从系统中删除了文件,它也仍然是重要的调查来源。这是因为检查员可以从已删除的文件中提取有价值的信息,例如原始文件路径以及将其发送到回收站的时间。

请注意,回收站证据的存储取决于Windows版本。在以下Python脚本中,我们将处理Windows 7,它创建两个文件:$R文件,其中包含已回收文件的实际内容;$I文件,其中包含原始文件名、路径、删除文件时的文件大小。

对于Python脚本,我们需要安装第三方模块,即pytsk3、pyewfunicodecsv。我们可以使用pip来安装它们。我们可以按照以下步骤从回收站中提取信息 -

  • 首先,我们需要使用递归方法扫描$Recycle.bin文件夹并选择所有以$I开头的文件。

  • 接下来,我们将读取文件的内容并解析可用的元数据结构。

  • 现在,我们将搜索关联的$R文件。

  • 最后,我们将结果写入CSV文件以供审查。

让我们看看如何为此目的使用 Python 代码:

首先,我们需要导入以下Python库 -

from __future__ import print_function
from argparse import ArgumentParser

import datetime
import os
import struct

from utility.pytskutil import TSKUtil
import unicodecsv as csv

接下来,我们需要为命令行处理程序提供参数。请注意,这里它将接受三个参数 - 第一个是证据文件的路径,第二个是证据文件的类型,第三个是CSV报告的所需输出路径,如下所示 -

if __name__ == '__main__':
   parser = argparse.ArgumentParser('Recycle Bin evidences')
   parser.add_argument('EVIDENCE_FILE', help = "Path to evidence file")
   parser.add_argument('IMAGE_TYPE', help = "Evidence file format",
   choices = ('ewf', 'raw'))
   parser.add_argument('CSV_REPORT', help = "Path to CSV report")
   args = parser.parse_args()
   main(args.EVIDENCE_FILE, args.IMAGE_TYPE, args.CSV_REPORT)

现在,定义main()函数,该函数将处理所有处理。它将搜索$I文件,如下所示 -

def main(evidence, image_type, report_file):
   tsk_util = TSKUtil(evidence, image_type)
   dollar_i_files = tsk_util.recurse_files("$I", path = '/$Recycle.bin',logic = "startswith")
   
   if dollar_i_files is not None:
      processed_files = process_dollar_i(tsk_util, dollar_i_files)
      write_csv(report_file,['file_path', 'file_size', 'deleted_time','dollar_i_file', 'dollar_r_file', 'is_directory'],processed_files)
   else:
      print("No $I files found")

现在,如果我们找到$I文件,则必须将其发送到process_dollar_i()函数,该函数将接受tsk_util对象以及$I文件的列表,如下所示 -

def process_dollar_i(tsk_util, dollar_i_files):
   processed_files = []
   
   for dollar_i in dollar_i_files:
      file_attribs = read_dollar_i(dollar_i[2])
      if file_attribs is None:
         continue
      file_attribs['dollar_i_file'] = os.path.join('/$Recycle.bin', dollar_i[1][1:])

现在,搜索$R文件,如下所示 -

recycle_file_path = os.path.join('/$Recycle.bin',dollar_i[1].rsplit("/", 1)[0][1:])
dollar_r_files = tsk_util.recurse_files(
   "$R" + dollar_i[0][2:],path = recycle_file_path, logic = "startswith")
   
   if dollar_r_files is None:
      dollar_r_dir = os.path.join(recycle_file_path,"$R" + dollar_i[0][2:])
      dollar_r_dirs = tsk_util.query_directory(dollar_r_dir)
   
   if dollar_r_dirs is None:
      file_attribs['dollar_r_file'] = "Not Found"
      file_attribs['is_directory'] = 'Unknown'
   
   else:
      file_attribs['dollar_r_file'] = dollar_r_dir
      file_attribs['is_directory'] = True
   
   else:
      dollar_r = [os.path.join(recycle_file_path, r[1][1:])for r in dollar_r_files]
      file_attribs['dollar_r_file'] = ";".join(dollar_r)
      file_attribs['is_directory'] = False
      processed_files.append(file_attribs)
   return processed_files  

现在,定义read_dollar_i()方法来读取$I文件,换句话说,解析元数据。我们将使用read_random()方法读取签名的前八个字节。如果签名不匹配,这将返回none。之后,如果这是一个有效的文件,我们将必须从$I文件中读取和解包值。

def read_dollar_i(file_obj):
   if file_obj.read_random(0, 8) != '\x01\x00\x00\x00\x00\x00\x00\x00':
      return None
   raw_file_size = struct.unpack('<q', file_obj.read_random(8, 8))
   raw_deleted_time = struct.unpack('<q',   file_obj.read_random(16, 8))
   raw_file_path = file_obj.read_random(24, 520)

现在,提取这些文件后,我们需要使用sizeof_fmt()函数将整数解释为人可读的值,如下所示 -

file_size = sizeof_fmt(raw_file_size[0])
deleted_time = parse_windows_filetime(raw_deleted_time[0])

file_path = raw_file_path.decode("utf16").strip("\x00")
return {'file_size': file_size, 'file_path': file_path,'deleted_time': deleted_time}

现在,我们需要定义sizeof_fmt()函数,如下所示 -

def sizeof_fmt(num, suffix = 'B'):
   for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
      if abs(num) < 1024.0:
         return "%3.1f%s%s" % (num, unit, suffix)
      num /= 1024.0
   return "%.1f%s%s" % (num, 'Yi', suffix)

现在,定义一个函数将解释的整数转换为格式化的日期和时间,如下所示 -

def parse_windows_filetime(date_value):
   microseconds = float(date_value) / 10
   ts = datetime.datetime(1601, 1, 1) + datetime.timedelta(
      microseconds = microseconds)
   return ts.strftime('%Y-%m-%d %H:%M:%S.%f')

现在,我们将定义write_csv()方法将处理后的结果写入CSV文件,如下所示 -

def write_csv(outfile, fieldnames, data):
   with open(outfile, 'wb') as open_outfile:
      csvfile = csv.DictWriter(open_outfile, fieldnames)
      csvfile.writeheader()
      csvfile.writerows(data)

运行上述脚本后,我们将获得来自$I和$R文件的数据。

便笺

Windows便笺替换了用笔和纸书写的现实习惯。这些便笺用于以不同的颜色、字体等选项浮动在桌面上。在Windows 7中,便笺文件存储为OLE文件,因此在以下Python脚本中,我们将调查此OLE文件以从中提取元数据。便笺。

对于此Python脚本,我们需要安装第三方模块,即olefile、pytsk3、pyewf和unicodecsv。我们可以使用命令pip来安装它们。

我们可以按照下面讨论的步骤从便笺文件中提取信息,即StickyNote.sn -

  • 首先,打开证据文件并找到所有StickyNote.snt文件。

  • 然后,从OLE流中解析元数据和内容,并将RTF内容写入文件。

  • 最后,创建此元数据的CSV报告。

Python 代码

让我们看看如何为此目的使用 Python 代码:

首先,导入以下Python库 -

from __future__ import print_function
from argparse import ArgumentParser

import unicodecsv as csv
import os
import StringIO

from utility.pytskutil import TSKUtil
import olefile

接下来,定义一个全局变量,该变量将在整个脚本中使用 -

REPORT_COLS = ['note_id', 'created', 'modified', 'note_text', 'note_file']

接下来,我们需要为命令行处理程序提供参数。请注意,这里它将接受三个参数 - 第一个是证据文件的路径,第二个是证据文件的类型,第三个是所需的输出路径,如下所示 -

if __name__ == '__main__':
   parser = argparse.ArgumentParser('Evidence from Sticky Notes')
   parser.add_argument('EVIDENCE_FILE', help="Path to evidence file")
   parser.add_argument('IMAGE_TYPE', help="Evidence file format",choices=('ewf', 'raw'))
   parser.add_argument('REPORT_FOLDER', help="Path to report folder")
   args = parser.parse_args()
   main(args.EVIDENCE_FILE, args.IMAGE_TYPE, args.REPORT_FOLDER)

现在,我们将定义main()函数,该函数将类似于前面的脚本,如下所示 -

def main(evidence, image_type, report_folder):
   tsk_util = TSKUtil(evidence, image_type)
   note_files = tsk_util.recurse_files('StickyNotes.snt', '/Users','equals')

现在,让我们遍历生成的文件。然后,我们将调用**parse_snt_file()**函数来处理文件,然后我们将使用**write_note_rtf()**方法写入RTF文件,如下所示:

report_details = []
for note_file in note_files:
   user_dir = note_file[1].split("/")[1]
   file_like_obj = create_file_like_obj(note_file[2])
   note_data = parse_snt_file(file_like_obj)
   
   if note_data is None:
      continue
   write_note_rtf(note_data, os.path.join(report_folder, user_dir))
   report_details += prep_note_report(note_data, REPORT_COLS,"/Users" + note_file[1])
   write_csv(os.path.join(report_folder, 'sticky_notes.csv'), REPORT_COLS,report_details)

接下来,我们需要定义此脚本中使用的各种函数。

首先,我们将定义**create_file_like_obj()**函数,用于通过获取**pytsk**文件对象来读取文件的大小。然后,我们将定义**parse_snt_file()**函数,该函数将文件类对象作为输入,用于读取和解释便签文件。

def parse_snt_file(snt_file):
   
   if not olefile.isOleFile(snt_file):
      print("This is not an OLE file")
      return None
   ole = olefile.OleFileIO(snt_file)
   note = {}
   
   for stream in ole.listdir():
      if stream[0].count("-") == 3:
         if stream[0] not in note:
            note[stream[0]] = {"created": ole.getctime(stream[0]),"modified": ole.getmtime(stream[0])}
         content = None
         if stream[1] == '0':
            content = ole.openstream(stream).read()
         elif stream[1] == '3':
            content = ole.openstream(stream).read().decode("utf-16")
         if content:
            note[stream[0]][stream[1]] = content
	return note

现在,通过定义**write_note_rtf()**函数创建RTF文件,如下所示

def write_note_rtf(note_data, report_folder):
   if not os.path.exists(report_folder):
      os.makedirs(report_folder)
   
   for note_id, stream_data in note_data.items():
      fname = os.path.join(report_folder, note_id + ".rtf")
      with open(fname, 'w') as open_file:
         open_file.write(stream_data['0'])

现在,我们将嵌套字典转换为扁平化的字典列表,这些字典更适合CSV电子表格。这将通过定义**prep_note_report()**函数来完成。最后,我们将定义**write_csv()**函数。

def prep_note_report(note_data, report_cols, note_file):
   report_details = []
   
   for note_id, stream_data in note_data.items():
      report_details.append({
         "note_id": note_id,
         "created": stream_data['created'],
         "modified": stream_data['modified'],
         "note_text": stream_data['3'].strip("\x00"),
         "note_file": note_file
      })
   return report_details
def write_csv(outfile, fieldnames, data):
   with open(outfile, 'wb') as open_outfile:
      csvfile = csv.DictWriter(open_outfile, fieldnames)
      csvfile.writeheader()
      csvfile.writerows(data)

运行上述脚本后,我们将从便签文件中获取元数据。

注册表文件

Windows注册表文件包含许多重要的细节,对于取证分析师来说,这些细节就像一个信息宝库。它是一个分层数据库,包含与操作系统配置、用户活动、软件安装等相关的详细信息。在下面的Python脚本中,我们将从**SYSTEM**和**SOFTWARE**配置单元中访问常见的基线信息。

对于此Python脚本,我们需要安装第三方模块,即**pytsk3、pyewf**和**registry**。我们可以使用**pip**来安装它们。

我们可以按照以下步骤从Windows注册表中提取信息:

  • 首先,根据名称和路径查找要处理的注册表配置单元。

  • 然后,我们使用StringIO和Registry模块打开这些文件。

  • 最后,我们需要处理每个配置单元,并将解析后的值打印到控制台以供解释。

Python 代码

让我们看看如何为此目的使用 Python 代码:

首先,导入以下Python库 -

from __future__ import print_function
from argparse import ArgumentParser

import datetime
import StringIO
import struct

from utility.pytskutil import TSKUtil
from Registry import Registry

现在,为命令行处理程序提供参数。这里它将接受两个参数 - 第一个是证据文件的路径,第二个是证据文件的类型,如下所示:

if __name__ == '__main__':
   parser = argparse.ArgumentParser('Evidence from Windows Registry')
   parser.add_argument('EVIDENCE_FILE', help = "Path to evidence file")
   parser.add_argument('IMAGE_TYPE', help = "Evidence file format",
   choices = ('ewf', 'raw'))
   args = parser.parse_args()
   main(args.EVIDENCE_FILE, args.IMAGE_TYPE)

现在,我们将定义**main()**函数,用于在**/Windows/System32/config**文件夹中搜索**SYSTEM**和**SOFTWARE**配置单元,如下所示:

def main(evidence, image_type):
   tsk_util = TSKUtil(evidence, image_type)
   tsk_system_hive = tsk_util.recurse_files('system', '/Windows/system32/config', 'equals')
   tsk_software_hive = tsk_util.recurse_files('software', '/Windows/system32/config', 'equals')
   system_hive = open_file_as_reg(tsk_system_hive[0][2])
   software_hive = open_file_as_reg(tsk_software_hive[0][2])
   process_system_hive(system_hive)
   process_software_hive(software_hive)

现在,定义打开注册表文件的函数。为此,我们需要从**pytsk**元数据中获取文件大小,如下所示:

def open_file_as_reg(reg_file):
   file_size = reg_file.info.meta.size
   file_content = reg_file.read_random(0, file_size)
   file_like_obj = StringIO.StringIO(file_content)
   return Registry.Registry(file_like_obj)

现在,借助以下方法,我们可以处理**SYSTEM>**配置单元:

def process_system_hive(hive):
   root = hive.root()
   current_control_set = root.find_key("Select").value("Current").value()
   control_set = root.find_key("ControlSet{:03d}".format(current_control_set))
   raw_shutdown_time = struct.unpack(
      '<Q', control_set.find_key("Control").find_key("Windows").value("ShutdownTime").value())
   
   shutdown_time = parse_windows_filetime(raw_shutdown_time[0])
   print("Last Shutdown Time: {}".format(shutdown_time))
   
   time_zone = control_set.find_key("Control").find_key("TimeZoneInformation")
      .value("TimeZoneKeyName").value()
   
   print("Machine Time Zone: {}".format(time_zone))
   computer_name = control_set.find_key("Control").find_key("ComputerName").find_key("ComputerName")
      .value("ComputerName").value()
   
   print("Machine Name: {}".format(computer_name))
   last_access = control_set.find_key("Control").find_key("FileSystem")
      .value("NtfsDisableLastAccessUpdate").value()
   last_access = "Disabled" if last_access == 1 else "enabled"
   print("Last Access Updates: {}".format(last_access))

现在,我们需要定义一个函数,将解释的整数格式化为日期和时间,如下所示:

def parse_windows_filetime(date_value):
   microseconds = float(date_value) / 10
   ts = datetime.datetime(1601, 1, 1) + datetime.timedelta(microseconds = microseconds)
   return ts.strftime('%Y-%m-%d %H:%M:%S.%f')

def parse_unix_epoch(date_value):
   ts = datetime.datetime.fromtimestamp(date_value)
   return ts.strftime('%Y-%m-%d %H:%M:%S.%f')

现在,借助以下方法,我们可以处理**SOFTWARE**配置单元:

def process_software_hive(hive):
   root = hive.root()
   nt_curr_ver = root.find_key("Microsoft").find_key("Windows NT")
      .find_key("CurrentVersion")
   
   print("Product name: {}".format(nt_curr_ver.value("ProductName").value()))
   print("CSD Version: {}".format(nt_curr_ver.value("CSDVersion").value()))
   print("Current Build: {}".format(nt_curr_ver.value("CurrentBuild").value()))
   print("Registered Owner: {}".format(nt_curr_ver.value("RegisteredOwner").value()))
   print("Registered Org: 
      {}".format(nt_curr_ver.value("RegisteredOrganization").value()))
   
   raw_install_date = nt_curr_ver.value("InstallDate").value()
   install_date = parse_unix_epoch(raw_install_date)
   print("Installation Date: {}".format(install_date))

运行上述脚本后,我们将获得存储在Windows注册表文件中的元数据。

Windows重要工件-II

本章讨论了Windows中一些更重要的工件及其使用Python的提取方法。

用户活动

Windows拥有**NTUSER.DAT**文件用于存储各种用户活动。每个用户配置文件都具有类似**NTUSER.DAT**的配置单元,其中存储与该用户相关的信息和配置。因此,对于取证分析师进行调查非常有用。

以下Python脚本将解析**NTUSER.DAT**的一些键,以探索用户在系统上的操作。在继续之前,对于Python脚本,我们需要安装第三方模块,即**Registry、pytsk3、pyewf**和**Jinja2**。我们可以使用pip来安装它们。

我们可以按照以下步骤从**NTUSER.DAT**文件中提取信息:

  • 首先,搜索系统中的所有**NTUSER.DAT**文件。

  • 然后,为每个**NTUSER.DAT**文件解析**WordWheelQuery、TypePath和RunMRU**键。

  • 最后,我们将使用**Jinja2** fmodule将这些已处理的工件写入HTML报告。

Python 代码

让我们看看如何为此目的使用 Python 代码:

首先,我们需要导入以下Python模块:

from __future__ import print_function
from argparse import ArgumentParser

import os
import StringIO
import struct

from utility.pytskutil import TSKUtil
from Registry import Registry
import jinja2

现在,为命令行处理程序提供参数。这里它将接受三个参数 - 第一个是证据文件的路径,第二个是证据文件的类型,第三个是HTML报告的所需输出路径,如下所示:

if __name__ == '__main__':
   parser = argparse.ArgumentParser('Information from user activities')
   parser.add_argument('EVIDENCE_FILE',help = "Path to evidence file")
   parser.add_argument('IMAGE_TYPE',help = "Evidence file format",choices = ('ewf', 'raw'))
   parser.add_argument('REPORT',help = "Path to report file")
   args = parser.parse_args()
   main(args.EVIDENCE_FILE, args.IMAGE_TYPE, args.REPORT)

现在,让我们定义**main()**函数来搜索所有**NTUSER.DAT**文件,如下所示:

def main(evidence, image_type, report):
   tsk_util = TSKUtil(evidence, image_type)
   tsk_ntuser_hives = tsk_util.recurse_files('ntuser.dat','/Users', 'equals')
   
   nt_rec = {
      'wordwheel': {'data': [], 'title': 'WordWheel Query'},
      'typed_path': {'data': [], 'title': 'Typed Paths'},
      'run_mru': {'data': [], 'title': 'Run MRU'}
   }

现在,我们将尝试在**NTUSER.DAT**文件中查找键,一旦找到,就定义用户处理函数,如下所示:

for ntuser in tsk_ntuser_hives:
   uname = ntuser[1].split("/")

open_ntuser = open_file_as_reg(ntuser[2])
try:
   explorer_key = open_ntuser.root().find_key("Software").find_key("Microsoft")
      .find_key("Windows").find_key("CurrentVersion").find_key("Explorer")
   except Registry.RegistryKeyNotFoundException:
      continue
   nt_rec['wordwheel']['data'] += parse_wordwheel(explorer_key, uname)
   nt_rec['typed_path']['data'] += parse_typed_paths(explorer_key, uname)
   nt_rec['run_mru']['data'] += parse_run_mru(explorer_key, uname)
   nt_rec['wordwheel']['headers'] = \ nt_rec['wordwheel']['data'][0].keys()
   nt_rec['typed_path']['headers'] = \ nt_rec['typed_path']['data'][0].keys()
   nt_rec['run_mru']['headers'] = \ nt_rec['run_mru']['data'][0].keys()

现在,将字典对象及其路径传递给**write_html()**方法,如下所示:

write_html(report, nt_rec)

现在,定义一个方法,该方法获取**pytsk**文件句柄并通过**StringIO**类将其读入Registry类。

def open_file_as_reg(reg_file):
   file_size = reg_file.info.meta.size
   file_content = reg_file.read_random(0, file_size)
   file_like_obj = StringIO.StringIO(file_content)
   return Registry.Registry(file_like_obj)

现在,我们将定义一个函数,该函数将解析并处理来自**NTUSER.DAT**文件的**WordWheelQuery**键,如下所示:

def parse_wordwheel(explorer_key, username):
   try:
      wwq = explorer_key.find_key("WordWheelQuery")
   except Registry.RegistryKeyNotFoundException:
      return []
   mru_list = wwq.value("MRUListEx").value()
   mru_order = []
   
   for i in xrange(0, len(mru_list), 2):
      order_val = struct.unpack('h', mru_list[i:i + 2])[0]
   if order_val in mru_order and order_val in (0, -1):
      break
   else:
      mru_order.append(order_val)
   search_list = []
   
   for count, val in enumerate(mru_order):
      ts = "N/A"
      if count == 0:
         ts = wwq.timestamp()
      search_list.append({
         'timestamp': ts,
         'username': username,
         'order': count,
         'value_name': str(val),
         'search': wwq.value(str(val)).value().decode("UTF-16").strip("\x00")
})
   return search_list  

现在,我们将定义一个函数,该函数将解析并处理来自**NTUSER.DAT**文件的**TypedPaths**键,如下所示:

def parse_typed_paths(explorer_key, username):
   try:
      typed_paths = explorer_key.find_key("TypedPaths")
   except Registry.RegistryKeyNotFoundException:
      return []
   typed_path_details = []
   
   for val in typed_paths.values():
      typed_path_details.append({
         "username": username,
         "value_name": val.name(),
         "path": val.value()
      })
   return typed_path_details

现在,我们将定义一个函数,该函数将解析并处理来自**NTUSER.DAT**文件的**RunMRU**键,如下所示:

def parse_run_mru(explorer_key, username):
   try:
      run_mru = explorer_key.find_key("RunMRU")
   except Registry.RegistryKeyNotFoundException:
      return []
   
   if len(run_mru.values()) == 0:
      return []
   mru_list = run_mru.value("MRUList").value()
   mru_order = []
   
   for i in mru_list:
      mru_order.append(i)
   mru_details = []
   
   for count, val in enumerate(mru_order):
      ts = "N/A"
      if count == 0:
         ts = run_mru.timestamp()
      mru_details.append({
         "username": username,
         "timestamp": ts,
         "order": count,
         "value_name": val,
         "run_statement": run_mru.value(val).value()
      })
   return mru_details

现在,以下函数将处理HTML报告的创建:

def write_html(outfile, data_dict):
   cwd = os.path.dirname(os.path.abspath(__file__))
   env = jinja2.Environment(loader=jinja2.FileSystemLoader(cwd))
   template = env.get_template("user_activity.html")
   rendering = template.render(nt_data=data_dict)
   
   with open(outfile, 'w') as open_outfile:
      open_outfile.write(rendering)

最后,我们可以为报告编写HTML文档。运行上述脚本后,我们将以HTML文档格式获得NTUSER.DAT文件中的信息。

LINK文件

当用户或操作系统为经常使用、双击或从系统驱动器(如附加存储)访问的文件创建快捷方式文件时,会创建快捷方式文件。此类快捷方式文件称为链接文件。通过访问这些链接文件,调查人员可以找到窗口的活动,例如访问这些文件的时间和位置。

让我们讨论一下我们可以用来从这些Windows LINK文件中获取信息的Python脚本。

对于Python脚本,安装第三方模块,即**pylnk、pytsk3、pyewf**。我们可以按照以下步骤从**lnk**文件中提取信息

  • 首先,搜索系统内的**lnk**文件。

  • 然后,通过遍历它们来提取文件中的信息。

  • 现在,最后我们需要将此信息写入CSV报告。

Python 代码

让我们看看如何为此目的使用 Python 代码:

首先,导入以下Python库 -

from __future__ import print_function
from argparse import ArgumentParser

import csv
import StringIO

from utility.pytskutil import TSKUtil
import pylnk

现在,为命令行处理程序提供参数。这里它将接受三个参数 - 第一个是证据文件的路径,第二个是证据文件的类型,第三个是CSV报告的所需输出路径,如下所示:

if __name__ == '__main__':
   parser = argparse.ArgumentParser('Parsing LNK files')
   parser.add_argument('EVIDENCE_FILE', help = "Path to evidence file")
   parser.add_argument('IMAGE_TYPE', help = "Evidence file format",choices = ('ewf', 'raw'))
   parser.add_argument('CSV_REPORT', help = "Path to CSV report")
   args = parser.parse_args()
   main(args.EVIDENCE_FILE, args.IMAGE_TYPE, args.CSV_REPORT)

现在,通过创建**TSKUtil**对象来解释证据文件,并遍历文件系统以查找以**lnk**结尾的文件。这可以通过定义**main()**函数来完成,如下所示:

def main(evidence, image_type, report):
   tsk_util = TSKUtil(evidence, image_type)
   lnk_files = tsk_util.recurse_files("lnk", path="/", logic="endswith")
   
   if lnk_files is None:
      print("No lnk files found")
      exit(0)
   columns = [
      'command_line_arguments', 'description', 'drive_serial_number',
      'drive_type', 'file_access_time', 'file_attribute_flags',
      'file_creation_time', 'file_modification_time', 'file_size',
      'environmental_variables_location', 'volume_label',
      'machine_identifier', 'local_path', 'network_path',
      'relative_path', 'working_directory'
   ]

现在,借助以下代码,我们将通过创建一个函数来遍历**lnk**文件,如下所示:

parsed_lnks = []

for entry in lnk_files:
   lnk = open_file_as_lnk(entry[2])
   lnk_data = {'lnk_path': entry[1], 'lnk_name': entry[0]}
   
   for col in columns:
      lnk_data[col] = getattr(lnk, col, "N/A")
   lnk.close()
   parsed_lnks.append(lnk_data)
write_csv(report, columns + ['lnk_path', 'lnk_name'], parsed_lnks)

现在,我们需要定义两个函数,一个将打开**pytsk**文件对象,另一个将用于写入CSV报告,如下所示:

def open_file_as_lnk(lnk_file):
   file_size = lnk_file.info.meta.size
   file_content = lnk_file.read_random(0, file_size)
   file_like_obj = StringIO.StringIO(file_content)
   lnk = pylnk.file()
   lnk.open_file_object(file_like_obj)
   return lnk
def write_csv(outfile, fieldnames, data):
   with open(outfile, 'wb') as open_outfile:
      csvfile = csv.DictWriter(open_outfile, fieldnames)
      csvfile.writeheader()
      csvfile.writerows(data)

运行上述脚本后,我们将获得发现的**lnk**文件中包含的信息,这些信息将保存在CSV报告中:

预取文件

每当从特定位置首次运行应用程序时,Windows都会创建**预取文件**。这些用于加快应用程序启动过程。这些文件的扩展名为**.PF**,并存储在**”\Root\Windows\Prefetch”**文件夹中。

数字取证专家可以揭示从指定位置执行程序的证据以及用户的详细信息。预取文件对于检查员来说是有用的工件,因为即使在程序被删除或卸载后,其条目仍然存在。

让我们讨论一下将从Windows预取文件中获取信息的Python脚本,如下所示:

对于Python脚本,安装第三方模块,即**pylnk、pytsk3**和**unicodecsv**。回想一下,我们已经在前面章节中讨论的Python脚本中使用过这些库。

我们必须按照以下步骤从**prefetch**文件中提取信息:

  • 首先,扫描**.pf**扩展名文件或预取文件。

  • 现在,执行签名验证以消除误报。

  • 接下来,解析Windows预取文件格式。这与Windows版本不同。例如,对于Windows XP,它是17,对于Windows Vista和Windows 7,它是23,对于Windows 8.1,它是26,对于Windows 10,它是30。

  • 最后,我们将解析后的结果写入CSV文件。

Python 代码

让我们看看如何为此目的使用 Python 代码:

首先,导入以下Python库 -

from __future__ import print_function
import argparse
from datetime import datetime, timedelta

import os
import pytsk3
import pyewf
import struct
import sys
import unicodecsv as csv
from utility.pytskutil import TSKUtil

现在,为命令行处理程序提供参数。这里它将接受两个参数,第一个是证据文件的路径,第二个是证据文件的类型。它还接受一个可选参数,用于指定扫描预取文件的路径:

if __name__ == "__main__":
   parser = argparse.ArgumentParser('Parsing Prefetch files')
   parser.add_argument("EVIDENCE_FILE", help = "Evidence file path")
   parser.add_argument("TYPE", help = "Type of Evidence",choices = ("raw", "ewf"))
   parser.add_argument("OUTPUT_CSV", help = "Path to write output csv")
   parser.add_argument("-d", help = "Prefetch directory to scan",default = "/WINDOWS/PREFETCH")
   args = parser.parse_args()
   
   if os.path.exists(args.EVIDENCE_FILE) and \
      os.path.isfile(args.EVIDENCE_FILE):
   main(args.EVIDENCE_FILE, args.TYPE, args.OUTPUT_CSV, args.d)
else:
   print("[-] Supplied input file {} does not exist or is not a ""file".format(args.EVIDENCE_FILE))
   sys.exit(1)

现在,通过创建**TSKUtil**对象来解释证据文件,并遍历文件系统以查找以**.pf**结尾的文件。这可以通过定义**main()**函数来完成,如下所示:

def main(evidence, image_type, output_csv, path):
   tsk_util = TSKUtil(evidence, image_type)
   prefetch_dir = tsk_util.query_directory(path)
   prefetch_files = None
   
   if prefetch_dir is not None:
      prefetch_files = tsk_util.recurse_files(".pf", path=path, logic="endswith")
   
   if prefetch_files is None:
      print("[-] No .pf files found")
      sys.exit(2)
   print("[+] Identified {} potential prefetch files".format(len(prefetch_files)))
   prefetch_data = []
   
   for hit in prefetch_files:
      prefetch_file = hit[2]
      pf_version = check_signature(prefetch_file)

现在,定义一个方法,该方法将执行签名的验证,如下所示:

def check_signature(prefetch_file):
   version, signature = struct.unpack("^<2i", prefetch_file.read_random(0, 8))
   
   if signature == 1094927187:
      return version
   else:
      return None
   
   if pf_version is None:
      continue
   pf_name = hit[0]
   
   if pf_version == 17:
      parsed_data = parse_pf_17(prefetch_file, pf_name)
      parsed_data.append(os.path.join(path, hit[1].lstrip("//")))
      prefetch_data.append(parsed_data)

现在,开始处理Windows预取文件。这里我们以Windows XP预取文件为例:

def parse_pf_17(prefetch_file, pf_name):
   create = convert_unix(prefetch_file.info.meta.crtime)
   modify = convert_unix(prefetch_file.info.meta.mtime)
def convert_unix(ts):
   if int(ts) == 0:
      return ""
   return datetime.utcfromtimestamp(ts)
def convert_filetime(ts):
   if int(ts) == 0:
      return ""
   return datetime(1601, 1, 1) + timedelta(microseconds=ts / 10)

现在,使用struct提取嵌入在预取文件中的数据,如下所示:

pf_size, name, vol_info, vol_entries, vol_size, filetime, \
   count = struct.unpack("<i60s32x3iq16xi",prefetch_file.read_random(12, 136))
name = name.decode("utf-16", "ignore").strip("/x00").split("/x00")[0]

vol_name_offset, vol_name_length, vol_create, \
   vol_serial = struct.unpack("<2iqi",prefetch_file.read_random(vol_info, 20))
   vol_serial = hex(vol_serial).lstrip("0x")
   vol_serial = vol_serial[:4] + "-" + vol_serial[4:]
   vol_name = struct.unpack(
      "<{}s".format(2 * vol_name_length),
      prefetch_file.read_random(vol_info + vol_name_offset,vol_name_length * 2))[0]

vol_name = vol_name.decode("utf-16", "ignore").strip("/x00").split("/x00")[0]
return [
   pf_name, name, pf_size, create,
   modify, convert_filetime(filetime), count, vol_name,
   convert_filetime(vol_create), vol_serial ]

我们提供了Windows XP的预取版本,但如果遇到其他Windows的预取版本会怎样?然后它必须显示一条错误消息,如下所示:

elif pf_version == 23:
   print("[-] Windows Vista / 7 PF file {} -- unsupported".format(pf_name))
   continue
elif pf_version == 26:
   print("[-] Windows 8 PF file {} -- unsupported".format(pf_name))
   continue
elif pf_version == 30:
   print("[-] Windows 10 PF file {} -- unsupported".format(pf_name))
continue

else:
   print("[-] Signature mismatch - Name: {}\nPath: {}".format(hit[0], hit[1]))
continue
write_output(prefetch_data, output_csv)

现在,定义将结果写入CSV报告的方法,如下所示:

def write_output(data, output_csv):
   print("[+] Writing csv report")
   with open(output_csv, "wb") as outfile:
      writer = csv.writer(outfile)
      writer.writerow([
         "File Name", "Prefetch Name", "File Size (bytes)",
         "File Create Date (UTC)", "File Modify Date (UTC)",
         "Prefetch Last Execution Date (UTC)",
         "Prefetch Execution Count", "Volume", "Volume Create Date",
         "Volume Serial", "File Path" ])
      writer.writerows(data)

运行上述脚本后,我们将获得Windows XP版本预取文件中的信息,这些信息将保存在电子表格中。

Windows重要工件-III

本章将解释调查人员在对Windows进行取证分析期间可以获得的其他工件。

事件日志

Windows事件日志文件,顾名思义,是存储重要事件的特殊文件,例如用户何时登录计算机、程序何时遇到错误、系统更改、RDP访问、应用程序特定事件等。网络调查人员始终对事件日志信息感兴趣,因为它提供了大量关于系统访问的有用历史信息。在下面的Python脚本中,我们将处理旧版和当前的Windows事件日志格式。

对于 Python 脚本,我们需要安装以下第三方模块:**pytsk3、pyewf、unicodecsv、pyevt 和 pyevt**x。我们可以按照以下步骤从事件日志中提取信息:

  • 首先,搜索所有与输入参数匹配的事件日志。

  • 然后,执行文件签名验证。

  • 现在,使用相应的库处理找到的每个事件日志。

  • 最后,将输出写入电子表格。

Python 代码

让我们看看如何为此目的使用 Python 代码:

首先,导入以下Python库 -

from __future__ import print_function
import argparse
import unicodecsv as csv
import os
import pytsk3
import pyewf
import pyevt
import pyevtx
import sys
from utility.pytskutil import TSKUtil

现在,提供命令行处理程序的参数。请注意,这里将接受三个参数:第一个是证据文件路径,第二个是证据文件类型,第三个是要处理的事件日志名称。

if __name__ == "__main__":
   parser = argparse.ArgumentParser('Information from Event Logs')
   parser.add_argument("EVIDENCE_FILE", help = "Evidence file path")
   parser.add_argument("TYPE", help = "Type of Evidence",choices = ("raw", "ewf"))
   parser.add_argument(
      "LOG_NAME",help = "Event Log Name (SecEvent.Evt, SysEvent.Evt, ""etc.)")
   
   parser.add_argument(
      "-d", help = "Event log directory to scan",default = "/WINDOWS/SYSTEM32/WINEVT")
   
   parser.add_argument(
      "-f", help = "Enable fuzzy search for either evt or"" evtx extension", action = "store_true")
   args = parser.parse_args()
   
   if os.path.exists(args.EVIDENCE_FILE) and \ os.path.isfile(args.EVIDENCE_FILE):
      main(args.EVIDENCE_FILE, args.TYPE, args.LOG_NAME, args.d, args.f)
   else:
      print("[-] Supplied input file {} does not exist or is not a ""file".format(args.EVIDENCE_FILE))
   sys.exit(1)

现在,通过创建我们的 **TSKUtil** 对象来与事件日志交互,以查询用户提供的路径是否存在。这可以通过以下 **main()** 方法完成:

def main(evidence, image_type, log, win_event, fuzzy):
   tsk_util = TSKUtil(evidence, image_type)
   event_dir = tsk_util.query_directory(win_event)
   
   if event_dir is not None:
      if fuzzy is True:
         event_log = tsk_util.recurse_files(log, path=win_event)
   else:
      event_log = tsk_util.recurse_files(log, path=win_event, logic="equal")
   
   if event_log is not None:
      event_data = []
      for hit in event_log:
         event_file = hit[2]
         temp_evt = write_file(event_file)

现在,我们需要执行签名验证,然后定义一个方法将整个内容写入当前目录:

def write_file(event_file):
   with open(event_file.info.name.name, "w") as outfile:
      outfile.write(event_file.read_random(0, event_file.info.meta.size))
   return event_file.info.name.name
      if pyevt.check_file_signature(temp_evt):
         evt_log = pyevt.open(temp_evt)
         print("[+] Identified {} records in {}".format(
            evt_log.number_of_records, temp_evt))
         
         for i, record in enumerate(evt_log.records):
            strings = ""
            for s in record.strings:
               if s is not None:
                  strings += s + "\n"
            event_data.append([
               i, hit[0], record.computer_name,
               record.user_security_identifier,
               record.creation_time, record.written_time,
               record.event_category, record.source_name,
               record.event_identifier, record.event_type,
               strings, "",
               os.path.join(win_event, hit[1].lstrip("//"))
            ])
      elif pyevtx.check_file_signature(temp_evt):
         evtx_log = pyevtx.open(temp_evt)
         print("[+] Identified {} records in {}".format(
            evtx_log.number_of_records, temp_evt))
         for i, record in enumerate(evtx_log.records):
            strings = ""
            for s in record.strings:
			   if s is not None:
               strings += s + "\n"
         event_data.append([
            i, hit[0], record.computer_name,
            record.user_security_identifier, "",
            record.written_time, record.event_level,
            record.source_name, record.event_identifier,
            "", strings, record.xml_string,
            os.path.join(win_event, hit[1].lstrip("//"))
      ])
      else:
         print("[-] {} not a valid event log. Removing temp" file...".format(temp_evt))
         os.remove(temp_evt)
      continue
      write_output(event_data)
   else:
      print("[-] {} Event log not found in {} directory".format(log, win_event))
      sys.exit(3)
else:
   print("[-] Win XP Event Log Directory {} not found".format(win_event))
   sys.exit(2

最后,定义一个将输出写入电子表格的方法,如下所示:

def write_output(data):
   output_name = "parsed_event_logs.csv"
   print("[+] Writing {} to current working directory: {}".format(
      output_name, os.getcwd()))
   
   with open(output_name, "wb") as outfile:
      writer = csv.writer(outfile)
      writer.writerow([
         "Index", "File name", "Computer Name", "SID",
         "Event Create Date", "Event Written Date",
         "Event Category/Level", "Event Source", "Event ID",
         "Event Type", "Data", "XML Data", "File Path"
      ])
      writer.writerows(data)

成功运行上述脚本后,我们将获得电子表格中事件日志的信息。

互联网历史记录

互联网历史记录对于取证分析师非常有用;因为大多数网络犯罪仅发生在互联网上。让我们看看如何从 Internet Explorer 中提取互联网历史记录,因为我们正在讨论 Windows 取证,而 Internet Explorer 是 Windows 的默认浏览器。

在 Internet Explorer 中,互联网历史记录保存在 **index.dat** 文件中。让我们看看一个 Python 脚本,它将从 **index.dat** 文件中提取信息。

我们可以按照以下步骤从 **index.dat** 文件中提取信息:

  • 首先,在系统中搜索 **index.dat** 文件。

  • 然后,通过遍历它们来提取文件中的信息。

  • 现在,将所有这些信息写入 CSV 报告。

Python 代码

让我们看看如何为此目的使用 Python 代码:

首先,导入以下Python库 -

from __future__ import print_function
import argparse

from datetime import datetime, timedelta
import os
import pytsk3
import pyewf
import pymsiecf
import sys
import unicodecsv as csv

from utility.pytskutil import TSKUtil

现在,提供命令行处理程序的参数。请注意,这里将接受两个参数:第一个是证据文件路径,第二个是证据文件类型。

if __name__ == "__main__":
parser = argparse.ArgumentParser('getting information from internet history')
   parser.add_argument("EVIDENCE_FILE", help = "Evidence file path")
   parser.add_argument("TYPE", help = "Type of Evidence",choices = ("raw", "ewf"))
   parser.add_argument("-d", help = "Index.dat directory to scan",default = "/USERS")
   args = parser.parse_args()
   
   if os.path.exists(args.EVIDENCE_FILE) and os.path.isfile(args.EVIDENCE_FILE):
      main(args.EVIDENCE_FILE, args.TYPE, args.d)
   else:
      print("[-] Supplied input file {} does not exist or is not a ""file".format(args.EVIDENCE_FILE))
      sys.exit(1)

现在,通过创建 **TSKUtil** 对象来解释证据文件,并遍历文件系统以查找 index.dat 文件。这可以通过定义如下 **main()** 函数来完成:

def main(evidence, image_type, path):
   tsk_util = TSKUtil(evidence, image_type)
   index_dir = tsk_util.query_directory(path)
   
   if index_dir is not None:
      index_files = tsk_util.recurse_files("index.dat", path = path,logic = "equal")
      
      if index_files is not None:
         print("[+] Identified {} potential index.dat files".format(len(index_files)))
         index_data = []
         
         for hit in index_files:
            index_file = hit[2]
            temp_index = write_file(index_file)

现在,定义一个函数,借助该函数我们可以将 index.dat 文件的信息复制到当前工作目录,稍后它们可以由第三方模块进行处理:

def write_file(index_file):
   with open(index_file.info.name.name, "w") as outfile:
   outfile.write(index_file.read_random(0, index_file.info.meta.size))
return index_file.info.name.name

现在,使用以下代码执行签名验证,借助内置函数 **check_file_signature()**:

if pymsiecf.check_file_signature(temp_index):
   index_dat = pymsiecf.open(temp_index)
   print("[+] Identified {} records in {}".format(
   index_dat.number_of_items, temp_index))

   for i, record in enumerate(index_dat.items):
   try:
      data = record.data
   if data is not None:
      data = data.rstrip("\x00")
   except AttributeError:
   
   if isinstance(record, pymsiecf.redirected):
      index_data.append([
         i, temp_index, "", "", "", "", "",record.location, "", "", record.offset,os.path.join(path, hit[1].lstrip("//"))])
   
   elif isinstance(record, pymsiecf.leak):
      index_data.append([
         i, temp_index, record.filename, "","", "", "", "", "", "", record.offset,os.path.join(path, hit[1].lstrip("//"))])
   continue
   
   index_data.append([
      i, temp_index, record.filename,
      record.type, record.primary_time,
      record.secondary_time,
      record.last_checked_time, record.location,
      record.number_of_hits, data, record.offset,
      os.path.join(path, hit[1].lstrip("//"))
   ])
   else:
      print("[-] {} not a valid index.dat file. Removing "
      "temp file..".format(temp_index))
      os.remove("index.dat")
      continue
      os.remove("index.dat")
      write_output(index_data)
   else:
      print("[-] Index.dat files not found in {} directory".format(path))
   sys.exit(3)
   else:
      print("[-] Directory {} not found".format(win_event))
   sys.exit(2)

现在,定义一个方法,该方法将输出打印到 CSV 文件中,如下所示:

def write_output(data):
   output_name = "Internet_Indexdat_Summary_Report.csv"
   print("[+] Writing {} with {} parsed index.dat files to current "
   "working directory: {}".format(output_name, len(data),os.getcwd()))
   
   with open(output_name, "wb") as outfile:
      writer = csv.writer(outfile)
      writer.writerow(["Index", "File Name", "Record Name",
      "Record Type", "Primary Date", "Secondary Date",
      "Last Checked Date", "Location", "No. of Hits",
      "Record Data", "Record Offset", "File Path"])
      writer.writerows(data)

运行上述脚本后,我们将获得 CSV 文件中 index.dat 文件的信息。

卷影副本

卷影副本是 Windows 中包含的技术,用于手动或自动备份或快照计算机文件。它也称为卷快照服务或卷影服务 (VSS)。

借助这些 VSS 文件,取证专家可以获得有关系统如何随时间推移而变化以及计算机上存在哪些文件的一些历史信息。卷影副本技术要求文件系统为 NTFS,才能创建和存储卷影副本。

在本节中,我们将看到一个 Python 脚本,它有助于访问取证映像中存在的任何卷影副本卷。

对于 Python 脚本,我们需要安装以下第三方模块:**pytsk3、pyewf、unicodecsv、pyvshadow** 和 **vss**。我们可以按照以下步骤从 VSS 文件中提取信息

  • 首先,访问原始映像的卷并识别所有 NTFS 分区。

  • 然后,通过遍历这些卷影副本来提取它们的信息。

  • 现在,最后我们需要创建快照内数据的列表文件。

Python 代码

让我们看看如何为此目的使用 Python 代码:

首先,导入以下Python库 -

from __future__ import print_function
import argparse
from datetime import datetime, timedelta

import os
import pytsk3
import pyewf
import pyvshadow
import sys
import unicodecsv as csv

from utility import vss
from utility.pytskutil import TSKUtil
from utility import pytskutil

现在,提供命令行处理程序的参数。这里将接受两个参数:第一个是证据文件路径,第二个是输出文件。

if __name__ == "__main__":
   parser = argparse.ArgumentParser('Parsing Shadow Copies')
   parser.add_argument("EVIDENCE_FILE", help = "Evidence file path")
   parser.add_argument("OUTPUT_CSV", help = "Output CSV with VSS file listing")
   args = parser.parse_args()

现在,验证输入文件路径的存在,并从输出文件中分离目录。

directory = os.path.dirname(args.OUTPUT_CSV)
if not os.path.exists(directory) and directory != "":
   os.makedirs(directory)
if os.path.exists(args.EVIDENCE_FILE) and \ os.path.isfile(args.EVIDENCE_FILE):
   main(args.EVIDENCE_FILE, args.OUTPUT_CSV)
else:
   print("[-] Supplied input file {} does not exist or is not a "
   "file".format(args.EVIDENCE_FILE))
   
   sys.exit(1)

现在,通过创建 **TSKUtil** 对象来与证据文件的卷进行交互。这可以通过以下 **main()** 方法完成:

def main(evidence, output):
   tsk_util = TSKUtil(evidence, "raw")
   img_vol = tsk_util.return_vol()

if img_vol is not None:
   for part in img_vol:
      if tsk_util.detect_ntfs(img_vol, part):
         print("Exploring NTFS Partition for VSS")
         explore_vss(evidence, part.start * img_vol.info.block_size,output)
      else:
         print("[-] Must be a physical preservation to be compatible ""with this script")
         sys.exit(2)

现在,定义一个方法来探索已解析的卷影文件,如下所示:

def explore_vss(evidence, part_offset, output):
   vss_volume = pyvshadow.volume()
   vss_handle = vss.VShadowVolume(evidence, part_offset)
   vss_count = vss.GetVssStoreCount(evidence, part_offset)
   
   if vss_count > 0:
      vss_volume.open_file_object(vss_handle)
      vss_data = []
      
      for x in range(vss_count):
         print("Gathering data for VSC {} of {}".format(x, vss_count))
         vss_store = vss_volume.get_store(x)
         image = vss.VShadowImgInfo(vss_store)
         vss_data.append(pytskutil.openVSSFS(image, x))
write_csv(vss_data, output)

最后,定义一个将结果写入电子表格的方法,如下所示:

def write_csv(data, output):
   if data == []:
      print("[-] No output results to write")
      sys.exit(3)
   print("[+] Writing output to {}".format(output))
   if os.path.exists(output):
      append = True
with open(output, "ab") as csvfile:
      csv_writer = csv.writer(csvfile)
      headers = ["VSS", "File", "File Ext", "File Type", "Create Date",
         "Modify Date", "Change Date", "Size", "File Path"]
      if not append:
         csv_writer.writerow(headers)
      for result_list in data:
         csv_writer.writerows(result_list)

成功运行此 Python 脚本后,我们将获得驻留在 VSS 中的信息到电子表格中。

基于日志工件的调查

到目前为止,我们已经了解了如何使用 Python 获取 Windows 中的工件。在本章中,让我们学习如何使用 Python 对基于日志的工件进行调查。

简介

基于日志的工件是信息宝库,对于数字取证专家非常有用。尽管我们有各种监控软件来收集信息,但从这些软件中解析有用信息的主要问题是我们需要大量数据。

各种基于日志的工件和 Python 中的调查

在本节中,让我们讨论各种基于日志的工件及其在 Python 中的调查:

时间戳

时间戳传达日志中活动的日期和时间。它是任何日志文件的重要元素之一。请注意,这些日期和时间值可以采用各种格式。

下面显示的 Python 脚本将以原始日期时间作为输入,并提供格式化的时间戳作为输出。

对于此脚本,我们需要遵循以下步骤:

  • 首先,设置将获取原始数据值以及数据源和数据类型的参数。

  • 现在,提供一个类,为跨不同日期格式的数据提供通用接口。

Python 代码

让我们看看如何为此目的使用 Python 代码:

首先,导入以下 Python 模块:

from __future__ import print_function
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from datetime import datetime as dt
from datetime import timedelta

现在,像往常一样,我们需要为命令行处理程序提供参数。这里将接受三个参数,第一个是要处理的日期值,第二个是该日期值的来源,第三个是其类型:

if __name__ == '__main__':
   parser = ArgumentParser('Timestamp Log-based artifact')
   parser.add_argument("date_value", help="Raw date value to parse")
   parser.add_argument(
      "source", help = "Source format of date",choices = ParseDate.get_supported_formats())
   parser.add_argument(
      "type", help = "Data type of input value",choices = ('number', 'hex'), default = 'int')
   
   args = parser.parse_args()
   date_parser = ParseDate(args.date_value, args.source, args.type)
   date_parser.run()
   print(date_parser.timestamp)

现在,我们需要定义一个类,它将接受日期值、日期源和值类型作为参数:

class ParseDate(object):
   def __init__(self, date_value, source, data_type):
      self.date_value = date_value
      self.source = source
      self.data_type = data_type
      self.timestamp = None

现在,我们将定义一个方法,该方法将充当控制器,就像 **main()** 方法一样:

def run(self):
   if self.source == 'unix-epoch':
      self.parse_unix_epoch()
   elif self.source == 'unix-epoch-ms':
      self.parse_unix_epoch(True)
   elif self.source == 'windows-filetime':
      self.parse_windows_filetime()
@classmethod
def get_supported_formats(cls):
   return ['unix-epoch', 'unix-epoch-ms', 'windows-filetime']

现在,我们需要定义两个方法,分别处理 Unix 时间戳和 FILETIME:

def parse_unix_epoch(self, milliseconds=False):
   if self.data_type == 'hex':
      conv_value = int(self.date_value)
      if milliseconds:
         conv_value = conv_value / 1000.0
   elif self.data_type == 'number':
      conv_value = float(self.date_value)
      if milliseconds:
         conv_value = conv_value / 1000.0
   else:
      print("Unsupported data type '{}' provided".format(self.data_type))
      sys.exit('1')
   ts = dt.fromtimestamp(conv_value)
   self.timestamp = ts.strftime('%Y-%m-%d %H:%M:%S.%f')
def parse_windows_filetime(self):
   if self.data_type == 'hex':
      microseconds = int(self.date_value, 16) / 10.0
   elif self.data_type == 'number':
      microseconds = float(self.date_value) / 10
   else:
      print("Unsupported data type '{}'   provided".format(self.data_type))
      sys.exit('1')
   ts = dt(1601, 1, 1) + timedelta(microseconds=microseconds)
   self.timestamp = ts.strftime('%Y-%m-%d %H:%M:%S.%f')

运行上述脚本后,通过提供时间戳,我们可以获得易于阅读格式的转换值。

Web 服务器日志

从数字取证专家的角度来看,Web 服务器日志是另一个重要的工件,因为它们可以获取有用的用户统计信息以及有关用户和地理位置的信息。以下是将处理 Web 服务器日志后创建电子表格的 Python 脚本,以便轻松分析信息。

首先,我们需要导入以下 Python 模块:

from __future__ import print_function
from argparse import ArgumentParser, FileType

import re
import shlex
import logging
import sys
import csv

logger = logging.getLogger(__file__)

现在,我们需要定义将从日志中解析的模式:

iis_log_format = [
   ("date", re.compile(r"\d{4}-\d{2}-\d{2}")),
   ("time", re.compile(r"\d\d:\d\d:\d\d")),
   ("s-ip", re.compile(
      r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}")),
   ("cs-method", re.compile(
      r"(GET)|(POST)|(PUT)|(DELETE)|(OPTIONS)|(HEAD)|(CONNECT)")),
   ("cs-uri-stem", re.compile(r"([A-Za-z0-1/\.-]*)")),
   ("cs-uri-query", re.compile(r"([A-Za-z0-1/\.-]*)")),
   ("s-port", re.compile(r"\d*")),
   ("cs-username", re.compile(r"([A-Za-z0-1/\.-]*)")),
   ("c-ip", re.compile(
      r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}")),
   ("cs(User-Agent)", re.compile(r".*")),
   ("sc-status", re.compile(r"\d*")),
   ("sc-substatus", re.compile(r"\d*")),
   ("sc-win32-status", re.compile(r"\d*")),
   ("time-taken", re.compile(r"\d*"))]

现在,为命令行处理程序提供参数。这里将接受两个参数,第一个是要处理的 IIS 日志,第二个是所需的 CSV 文件路径。

if __name__ == '__main__':
   parser = ArgumentParser('Parsing Server Based Logs')
   parser.add_argument('iis_log', help = "Path to IIS Log",type = FileType('r'))
   parser.add_argument('csv_report', help = "Path to CSV report")
   parser.add_argument('-l', help = "Path to processing log",default=__name__ + '.log')
   args = parser.parse_args()
   logger.setLevel(logging.DEBUG)
   msg_fmt = logging.Formatter(
      "%(asctime)-15s %(funcName)-10s ""%(levelname)-8s %(message)s")
   
   strhndl = logging.StreamHandler(sys.stdout)
   strhndl.setFormatter(fmt = msg_fmt)
   fhndl = logging.FileHandler(args.log, mode = 'a')
   fhndl.setFormatter(fmt = msg_fmt)
   
   logger.addHandler(strhndl)
   logger.addHandler(fhndl)
   logger.info("Starting IIS Parsing ")
   logger.debug("Supplied arguments: {}".format(", ".join(sys.argv[1:])))
   logger.debug("System " + sys.platform)
   logger.debug("Version " + sys.version)
   main(args.iis_log, args.csv_report, logger)
   iologger.info("IIS Parsing Complete")

现在,我们需要定义 **main()** 方法,该方法将处理批量日志信息的脚本:

def main(iis_log, report_file, logger):
   parsed_logs = []

for raw_line in iis_log:
   line = raw_line.strip()
   log_entry = {}

if line.startswith("#") or len(line) == 0:
   continue

if '\"' in line:
   line_iter = shlex.shlex(line_iter)
else:
   line_iter = line.split(" ")
   for count, split_entry in enumerate(line_iter):
      col_name, col_pattern = iis_log_format[count]

      if col_pattern.match(split_entry):
         log_entry[col_name] = split_entry
else:
   logger.error("Unknown column pattern discovered. "
      "Line preserved in full below")
      logger.error("Unparsed Line: {}".format(line))
      parsed_logs.append(log_entry)
      
      logger.info("Parsed {} lines".format(len(parsed_logs)))
      cols = [x[0] for x in iis_log_format]
      
      logger.info("Creating report file: {}".format(report_file))
      write_csv(report_file, cols, parsed_logs)
      logger.info("Report created")

最后,我们需要定义一个将输出写入电子表格的方法:

def write_csv(outfile, fieldnames, data):
   with open(outfile, 'w', newline="") as open_outfile:
      csvfile = csv.DictWriter(open_outfile, fieldnames)
      csvfile.writeheader()
      csvfile.writerows(data)

运行上述脚本后,我们将获得电子表格中的基于 Web 服务器的日志。

使用 YARA 扫描重要文件

YARA(Yet Another Recursive Algorithm)是一种旨在用于恶意软件识别和事件响应的模式匹配实用程序。我们将使用 YARA 扫描文件。在以下 Python 脚本中,我们将使用 YARA。

我们可以使用以下命令安装 YARA:

pip install YARA

我们可以按照以下步骤使用 YARA 规则扫描文件:

  • 首先,设置和编译 YARA 规则

  • 然后,扫描单个文件,然后遍历目录以处理各个文件。

  • 最后,我们将结果导出到 CSV。

Python 代码

让我们看看如何为此目的使用 Python 代码:

首先,我们需要导入以下 Python 模块:

from __future__ import print_function
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter

import os
import csv
import yara

接下来,为命令行处理程序提供参数。请注意,这里将接受两个参数:第一个是 YARA 规则的路径,第二个是要扫描的文件。

if __name__ == '__main__':
   parser = ArgumentParser('Scanning files by YARA')
   parser.add_argument(
      'yara_rules',help = "Path to Yara rule to scan with. May be file or folder path.")
   parser.add_argument('path_to_scan',help = "Path to file or folder to scan")
   parser.add_argument('--output',help = "Path to output a CSV report of scan results")
   args = parser.parse_args()
   main(args.yara_rules, args.path_to_scan, args.output)

现在,我们将定义 **main()** 函数,该函数将接受 yara 规则的路径和要扫描的文件:

def main(yara_rules, path_to_scan, output):
   if os.path.isdir(yara_rules):
      yrules = yara.compile(yara_rules)
   else:
      yrules = yara.compile(filepath=yara_rules)
   if os.path.isdir(path_to_scan):
      match_info = process_directory(yrules, path_to_scan)
   else:
      match_info = process_file(yrules, path_to_scan)
   columns = ['rule_name', 'hit_value', 'hit_offset', 'file_name',
   'rule_string', 'rule_tag']
   
   if output is None:
      write_stdout(columns, match_info)
   else:
      write_csv(output, columns, match_info)

现在,定义一个方法,该方法将遍历目录并将结果传递给另一个方法以进行进一步处理:

def process_directory(yrules, folder_path):
   match_info = []
   for root, _, files in os.walk(folder_path):
      for entry in files:
         file_entry = os.path.join(root, entry)
         match_info += process_file(yrules, file_entry)
   return match_info

接下来,定义两个函数。请注意,首先我们将使用 **match()** 方法到 **yrules** 对象,另一个将匹配信息报告到控制台(如果用户未指定任何输出文件)。观察下面显示的代码:

def process_file(yrules, file_path):
   match = yrules.match(file_path)
   match_info = []
   
   for rule_set in match:
      for hit in rule_set.strings:
         match_info.append({
            'file_name': file_path,
            'rule_name': rule_set.rule,
            'rule_tag': ",".join(rule_set.tags),
            'hit_offset': hit[0],
            'rule_string': hit[1],
            'hit_value': hit[2]
         })
   return match_info
def write_stdout(columns, match_info):
   for entry in match_info:
      for col in columns:
         print("{}: {}".format(col, entry[col]))
   print("=" * 30)

最后,我们将定义一个将输出写入 CSV 文件的方法,如下所示:

def write_csv(outfile, fieldnames, data):
   with open(outfile, 'w', newline="") as open_outfile:
      csvfile = csv.DictWriter(open_outfile, fieldnames)
      csvfile.writeheader()
      csvfile.writerows(data)

成功运行上述脚本后,我们可以在命令行中提供适当的参数,并可以生成 CSV 报告。

广告