- 环境准备
windows10 64bit
python3.7 64bit
打开可执行文件,创建进程
定义变量
以下代码用 ctypes 定义了调用 windows API 需要的结构
- my_debugger_define.py
import ctypesWORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_pDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE) ]# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess", HANDLE),("hThread", HANDLE),("dwProcessId", DWORD),("dwThreadId", DWORD),]
调用CreateProcess
值得注意的是,原版用的是python2,是ascii编码,这里使用的是python3,是Unicode编码,所以使用 CreateProcessW 函数
- my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):passdef load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())
- my_test.py
import my_debuggerdebugger = my_debugger.debugger()debugger.load("11.exe")
打开pid对应的进程
一些需要的结构体
- my_debugger_define.py
import ctypesWORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR = ctypes.c_ulongDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE) ]# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess", HANDLE),("hThread", HANDLE),("dwProcessId", DWORD),("dwThreadId", DWORD),]class EXCEPTION_RECORD(ctypes.Structure):pass
EXCEPTION_RECORD._fields_ = [("ExceptionCode", DWORD),("ExceptionFlags", DWORD),("ExceptionRecord", ctypes.POINTER(EXCEPTION_RECORD)),("ExceptionAddress", PVOID),("NumberParameters", DWORD),("ExceptionInformation", UINT_PTR * 15),]class EXCEPTION_DEBUG_INFO(ctypes.Structure):_fields_ = [("ExceptionRecord", EXCEPTION_RECORD),("dwFirstChance", DWORD),]class DEBUG_EVENT_UNION(ctypes.Union):_fields_ = [("Exception", EXCEPTION_DEBUG_INFO),
# ("CreateThread", CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread", EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess", EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll", LOAD_DLL_DEBUG_INFO),
# ("UnloadDll", UNLOAD_DLL_DEBUG_INFO),
# ("DebugString", OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo", RIP_INFO),] class DEBUG_EVENT(ctypes.Structure):_fields_ = [("dwDebugEventCode", DWORD),("dwProcessId", DWORD),("dwThreadId", DWORD),("u", DEBUG_EVENT_UNION),]
要实现附加,需要:
打开进程,获得进程的所有权限。
让进程等待调试事件发生。
调试事件发生,要处理调试事件(这里还没有处理调试事件)。
处理完调试事件,下一步继续执行。
解除附加。
如下代码,有比较详细的注释
- my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):self.h_process = Noneself.pid = Noneself.debugger_active = Falsedef load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)# 保存上面创建进程的句柄,以供后续进程访问使用self.h_process = self.open_process(process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())# 打开 pid 进程,获得该进程的所有权限# 返回该进程句柄def open_process(self, pid):return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)def attach(self, pid):self.h_process = self.open_process(pid)# DebugActiveProcess 附加进程if kernel32.DebugActiveProcess(pid):self.debugger_active = Trueself.pid = int(pid)self.run()else:print("[*] Unable to attach to the process.")# 循环调用 WaitForDebugEvent ,等待调试事件发生def run(self):while self.debugger_active == True:self.get_debug_event()def get_debug_event(self):debug_event = my_debugger_defines.DEBUG_EVENT()continue_status = my_debugger_defines.DBG_CONTINUE# INFINITE:直到有调试事件发生if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):input("press a key to continue...")self.debugger_active = False# 调试事件处理完成后,恢复原执行状态# continue_status -> DBG_CONTINUE:下一步继续执行kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status)# DebugActiveProcessStop:去除附加def detach(self):if kernel32.DebugActiveProcessStop(self.pid):print("[*] Finished debugging. Exiting...")return Trueelse:print("There was an error")return False
- test.py
import my_debuggerdebugger = my_debugger.debugger()pid = input("Enter the PID of the process to attach to:")debugger.attach(int(pid))debugger.detach()
获取寄存器状态信息
这里值得说的是,在define中,CONTEXT应该是64位的版本,如果python是32的,CONTEXT就用老的版本。如果在64位的调式器中使用老版本的话就会得到 context全零的情况。
- debugger_define.py
import ctypesBYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR = ctypes.c_ulongDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF# snapshot
TH32CS_SNAPTHREAD = 0x00000004# Context flags for GetThreadContext()
CONTEXT_FULL = 0x00010007
CONTEXT_DEBUG_REGISTERS = 0x00010010# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE) ]# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess", HANDLE),("hThread", HANDLE),("dwProcessId", DWORD),("dwThreadId", DWORD),]class EXCEPTION_RECORD(ctypes.Structure):pass
EXCEPTION_RECORD._fields_ = [("ExceptionCode", DWORD),("ExceptionFlags", DWORD),("ExceptionRecord", ctypes.POINTER(EXCEPTION_RECORD)),("ExceptionAddress", PVOID),("NumberParameters", DWORD),("ExceptionInformation", UINT_PTR * 15),]class EXCEPTION_DEBUG_INFO(ctypes.Structure):_fields_ = [("ExceptionRecord", EXCEPTION_RECORD),("dwFirstChance", DWORD),]class DEBUG_EVENT_UNION(ctypes.Union):_fields_ = [("Exception", EXCEPTION_DEBUG_INFO),
# ("CreateThread", CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread", EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess", EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll", LOAD_DLL_DEBUG_INFO),
# ("UnloadDll", UNLOAD_DLL_DEBUG_INFO),
# ("DebugString", OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo", RIP_INFO),] class DEBUG_EVENT(ctypes.Structure):_fields_ = [("dwDebugEventCode", DWORD),("dwProcessId", DWORD),("dwThreadId", DWORD),("u", DEBUG_EVENT_UNION),]class THREADENTRY32(ctypes.Structure):_fields_ = [("dwSize", DWORD),("cntUsage", DWORD),("th32ThreadID", DWORD),("th32OwnerProcessID", DWORD),("tpBasePri", DWORD),("tpDeltaPri", DWORD),("dwFlags", DWORD),]class FLOATING_SAVE_AREA(ctypes.Structure):_fields_ = [("ControlWord", DWORD),("StatusWord", DWORD),("TagWord", DWORD),("ErrorOffset", DWORD),("ErrorSelector", DWORD),("DataOffset", DWORD),("DataSelector", DWORD),("RegisterArea", BYTE * 80),("Cr0NpxState", DWORD),
]# class CONTEXT(ctypes.Structure):
# _fields_ = [# ("ContextFlags", DWORD),
# ("Dr0", DWORD),
# ("Dr1", DWORD),
# ("Dr2", DWORD),
# ("Dr3", DWORD),
# ("Dr6", DWORD),
# ("Dr7", DWORD),
# ("FloatSave", FLOATING_SAVE_AREA),
# ("SegGs", DWORD),
# ("SegFs", DWORD),
# ("SegEs", DWORD),
# ("SegDs", DWORD),
# ("Edi", DWORD),
# ("Esi", DWORD),
# ("Ebx", DWORD),
# ("Edx", DWORD),
# ("Ecx", DWORD),
# ("Eax", DWORD),
# ("Ebp", DWORD),
# ("Eip", DWORD),
# ("SegCs", DWORD),
# ("EFlags", DWORD),
# ("Esp", DWORD),
# ("SegSs", DWORD),
# ("ExtendedRegisters", BYTE * 512),
# ]DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlongclass M128A(ctypes.Structure):_fields_ = [("Low", ULONGLONG),("High", LONGLONG)]class CONTEXT_UNION(ctypes.Union):_fields_ = [("S", DWORD * 32)]
class CONTEXT(ctypes.Structure):_fields_ = [("P1Home", DWORD64),("P2Home", DWORD64),("P3Home", DWORD64),("P4Home", DWORD64),("P5Home", DWORD64),("P6Home", DWORD64),("ContextFlags", DWORD),("MxCsr", DWORD),("SegCs", WORD),("SegDs", WORD),("SegEs", WORD),("SegFs", WORD),("SegGs", WORD),("SegSs", WORD),("EFlags", DWORD),("Dr0", DWORD),("Dr1", DWORD),("Dr2", DWORD),("Dr3", DWORD),("Dr6", DWORD),("Dr7", DWORD),("Rax", DWORD64),("Rcx", DWORD64),("Rdx", DWORD64),("Rbx", DWORD64),("Rsp", DWORD64),("Rbp", DWORD64),("Rsi", DWORD64),("Rdi", DWORD64),("R8", DWORD64),("R9", DWORD64),("R10", DWORD64),("R11", DWORD64),("R12", DWORD64),("R13", DWORD64),("R14", DWORD64),("R15", DWORD64),("Rip", DWORD64),("DUMMYUNIONNAME",CONTEXT_UNION),("VectorRegister", M128A * 26),("VectorControl", DWORD64),("DebugControl", DWORD64),("LastBranchToRip", DWORD64),("LastBranchFromRip", DWORD64),("LastExceptionToRip", DWORD64),("LastExceptionFromRip", DWORD64)
]
这里有详细的注释
- my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):self.h_process = Noneself.pid = Noneself.debugger_active = Falsedef load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)# 保存上面创建进程的句柄,以供后续进程访问使用self.h_process = self.open_process(process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())# 打开 pid 进程,获得该进程的所有权限# 返回该进程句柄def open_process(self, pid):return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)def attach(self, pid):self.h_process = self.open_process(pid)# DebugActiveProcess 附加进程if kernel32.DebugActiveProcess(pid):self.debugger_active = Trueself.pid = int(pid)# self.run()else:print("[*] Unable to attach to the process.")# 循环调用 WaitForDebugEvent ,等待调试事件发生def run(self):while self.debugger_active == True:self.get_debug_event()def get_debug_event(self):debug_event = my_debugger_defines.DEBUG_EVENT()continue_status = my_debugger_defines.DBG_CONTINUE# INFINITE:直到有调试事件发生# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):input("press a key to continue...")self.debugger_active = False# 调试事件处理完成后,恢复原执行状态# continue_status -> DBG_CONTINUE:下一步继续执行kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status)# DebugActiveProcessStop:去除附加def detach(self):if kernel32.DebugActiveProcessStop(self.pid):print("[*] Finished debugging. Exiting...")return Trueelse:print("There was an error")return Falsedef open_thread(self, thread_id):h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)if h_thread is not None:return h_threadelse:print("[*] Could not obtain a valid thread handle.")return Falsedef enumerate_threads(self):# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息thread_entry = my_debugger_defines.THREADENTRY32()thread_list = []# CreateToolhelp32Snapshot返回一个快照句柄# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,# 需要一一对比snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)if snapshot is not None:thread_entry.dwSize = ctypes.sizeof(thread_entry)# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))while success:# 对比该线程的拥有者是不是 pidif thread_entry.th32OwnerProcessID == self.pid:# 把该进程的线程放到列表thread_list.append(thread_entry.th32ThreadID)# 遍历后面所有线程success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))kernel32.CloseHandle(snapshot)return thread_listelse:return False# 获取线程上下文contextdef get_thread_context(self, thread_id=None, h_thread=None):# 线程上下文结构体为 CONTEXTcontext = my_debugger_defines.CONTEXT()context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS# 获取线程句柄h_thread = self.open_thread(thread_id)# 或许线程上下文if kernel32.GetThreadContext(h_thread, ctypes.byref(context)):kernel32.CloseHandle(h_thread)return contextelse:return False
- my_test.py
import my_debugger
import win32con
import win32apidebugger = my_debugger.debugger()pid = input("Enter the PID of the process to attach to:")debugger.attach(int(pid))list = debugger.enumerate_threads()
for thread in list:thread_context = debugger.get_thread_context(thread)print("[*] Dumping registers for thread ID: 0x%08x." % thread)print("[*] RIP 0x%08x" % thread_context.Rip)print("[*] RSP 0x%08x" % thread_context.Rsp)print("[*] RBP 0x%08x" % thread_context.Rbp)print("[*] RAX 0x%08x" % thread_context.Rax)print("[*] RBX 0x%08x" % thread_context.Rbx)print("[*] RCX 0x%08x" % thread_context.Rcx)print("[*] RDX 0x%08x" % thread_context.Rdx)debugger.detach()
实现调试事件
- my_debugger_define.py
import ctypesBYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR = ctypes.c_ulongDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF# snapshot
TH32CS_SNAPTHREAD = 0x00000004# Context flags for GetThreadContext()
CONTEXT_FULL = 0x00010007
CONTEXT_DEBUG_REGISTERS = 0x00010010# Debug event constants
EXCEPTION_DEBUG_EVENT = 0x1# debug exception codes.
EXCEPTION_ACCESS_VIOLATION = 0xC0000005
EXCEPTION_BREAKPOINT = 0x80000003
EXCEPTION_GUARD_PAGE = 0x80000001
EXCEPTION_SINGLE_STEP = 0x80000004# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE) ]# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess", HANDLE),("hThread", HANDLE),("dwProcessId", DWORD),("dwThreadId", DWORD),]class EXCEPTION_RECORD(ctypes.Structure):pass
EXCEPTION_RECORD._fields_ = [("ExceptionCode", DWORD),("ExceptionFlags", DWORD),("ExceptionRecord", ctypes.POINTER(EXCEPTION_RECORD)),("ExceptionAddress", PVOID),("NumberParameters", DWORD),("ExceptionInformation", UINT_PTR * 15),]class EXCEPTION_DEBUG_INFO(ctypes.Structure):_fields_ = [("ExceptionRecord", EXCEPTION_RECORD),("dwFirstChance", DWORD),]class DEBUG_EVENT_UNION(ctypes.Union):_fields_ = [("Exception", EXCEPTION_DEBUG_INFO),
# ("CreateThread", CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread", EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess", EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll", LOAD_DLL_DEBUG_INFO),
# ("UnloadDll", UNLOAD_DLL_DEBUG_INFO),
# ("DebugString", OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo", RIP_INFO),] class DEBUG_EVENT(ctypes.Structure):_fields_ = [("dwDebugEventCode", DWORD),("dwProcessId", DWORD),("dwThreadId", DWORD),("u", DEBUG_EVENT_UNION),]class THREADENTRY32(ctypes.Structure):_fields_ = [("dwSize", DWORD),("cntUsage", DWORD),("th32ThreadID", DWORD),("th32OwnerProcessID", DWORD),("tpBasePri", DWORD),("tpDeltaPri", DWORD),("dwFlags", DWORD),]class FLOATING_SAVE_AREA(ctypes.Structure):_fields_ = [("ControlWord", DWORD),("StatusWord", DWORD),("TagWord", DWORD),("ErrorOffset", DWORD),("ErrorSelector", DWORD),("DataOffset", DWORD),("DataSelector", DWORD),("RegisterArea", BYTE * 80),("Cr0NpxState", DWORD),
]# class CONTEXT(ctypes.Structure):
# _fields_ = [# ("ContextFlags", DWORD),
# ("Dr0", DWORD),
# ("Dr1", DWORD),
# ("Dr2", DWORD),
# ("Dr3", DWORD),
# ("Dr6", DWORD),
# ("Dr7", DWORD),
# ("FloatSave", FLOATING_SAVE_AREA),
# ("SegGs", DWORD),
# ("SegFs", DWORD),
# ("SegEs", DWORD),
# ("SegDs", DWORD),
# ("Edi", DWORD),
# ("Esi", DWORD),
# ("Ebx", DWORD),
# ("Edx", DWORD),
# ("Ecx", DWORD),
# ("Eax", DWORD),
# ("Ebp", DWORD),
# ("Eip", DWORD),
# ("SegCs", DWORD),
# ("EFlags", DWORD),
# ("Esp", DWORD),
# ("SegSs", DWORD),
# ("ExtendedRegisters", BYTE * 512),
# ]DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlongclass M128A(ctypes.Structure):_fields_ = [("Low", ULONGLONG),("High", LONGLONG)]class CONTEXT_UNION(ctypes.Union):_fields_ = [("S", DWORD * 32)]
class CONTEXT(ctypes.Structure):_fields_ = [("P1Home", DWORD64),("P2Home", DWORD64),("P3Home", DWORD64),("P4Home", DWORD64),("P5Home", DWORD64),("P6Home", DWORD64),("ContextFlags", DWORD),("MxCsr", DWORD),("SegCs", WORD),("SegDs", WORD),("SegEs", WORD),("SegFs", WORD),("SegGs", WORD),("SegSs", WORD),("EFlags", DWORD),("Dr0", DWORD),("Dr1", DWORD),("Dr2", DWORD),("Dr3", DWORD),("Dr6", DWORD),("Dr7", DWORD),("Rax", DWORD64),("Rcx", DWORD64),("Rdx", DWORD64),("Rbx", DWORD64),("Rsp", DWORD64),("Rbp", DWORD64),("Rsi", DWORD64),("Rdi", DWORD64),("R8", DWORD64),("R9", DWORD64),("R10", DWORD64),("R11", DWORD64),("R12", DWORD64),("R13", DWORD64),("R14", DWORD64),("R15", DWORD64),("Rip", DWORD64),("DUMMYUNIONNAME",CONTEXT_UNION),("VectorRegister", M128A * 26),("VectorControl", DWORD64),("DebugControl", DWORD64),("LastBranchToRip", DWORD64),("LastBranchFromRip", DWORD64),("LastExceptionToRip", DWORD64),("LastExceptionFromRip", DWORD64)
]
- my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):self.h_process = Noneself.pid = Noneself.debugger_active = False# 实现调试事件处理功能self.h_thread = Noneself.context = Noneself.exception = Noneself.exception_address = Nonedef load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)# 保存上面创建进程的句柄,以供后续进程访问使用self.h_process = self.open_process(process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())# 打开 pid 进程,获得该进程的所有权限# 返回该进程句柄def open_process(self, pid):return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)def attach(self, pid):self.h_process = self.open_process(pid)# DebugActiveProcess 附加进程if kernel32.DebugActiveProcess(pid):self.debugger_active = Trueself.pid = int(pid)# self.run()else:print("[*] Unable to attach to the process.")# 循环调用 WaitForDebugEvent ,等待调试事件发生def run(self):while self.debugger_active == True:self.get_debug_event()def get_debug_event(self):debug_event = my_debugger_defines.DEBUG_EVENT()continue_status = my_debugger_defines.DBG_CONTINUE# INFINITE:直到有调试事件发生# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):# 处理指定异常事件# 获取线程句柄self.h_thread = self.open_thread(debug_event.dwThreadId)self.context = self.get_thread_context(h_thread=self.h_thread)# 打印调试事件类型和线程IDprint("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:# 获取异常代码exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode# 异常地址self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress# 内存违法访问if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:print("Acess Violation Detected.")# 遇到一个断点elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:continue_status = self.exception_handler_breakpoint()# 内存断点elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:print("Guard Page Access Detected.")# 硬件断点elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:print("Single Stepping.")# input("press a key to continue...")# self.debugger_active = False# 调试事件处理完成后,恢复原执行状态# continue_status -> DBG_CONTINUE:下一步继续执行kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status)def exception_handler_breakpoint(self):print("[*] Inside the breakpoint handler.")print("Exception address: 0x%08x" % self.exception_address)return my_debugger_defines.DBG_CONTINUE# DebugActiveProcessStop:去除附加def detach(self):if kernel32.DebugActiveProcessStop(self.pid):print("[*] Finished debugging. Exiting...")return Trueelse:print("There was an error")return False# 通过线程ID打开线程,获取线程句柄def open_thread(self, thread_id):h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)if h_thread is not None:return h_threadelse:print("[*] Could not obtain a valid thread handle.")return False# 获取属于pid进程的线程的IDdef enumerate_threads(self):# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息thread_entry = my_debugger_defines.THREADENTRY32()thread_list = []# CreateToolhelp32Snapshot返回一个快照句柄# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,# 需要一一对比snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)if snapshot is not None:thread_entry.dwSize = ctypes.sizeof(thread_entry)# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))while success:# 对比该线程的拥有者是不是 pidif thread_entry.th32OwnerProcessID == self.pid:# 把该进程的线程ID放到列表thread_list.append(thread_entry.th32ThreadID)# 遍历后面所有线程success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))kernel32.CloseHandle(snapshot)return thread_listelse:return False# 通过线程ID,获取线程上下文contextdef get_thread_context(self, thread_id=None, h_thread=None):# 线程上下文结构体为 CONTEXTcontext = my_debugger_defines.CONTEXT()context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS# 通过线程ID,获取线程句柄if h_thread is None:self.h_thread = self.open_thread(thread_id)# 通过线程句柄,线程上下文if kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):kernel32.CloseHandle(h_thread)return contextelse:return False
- my_test.py
import my_debuggerdebugger = my_debugger.debugger()pid = input("Enter the PID of the process to attach to:")debugger.attach(int(pid))debugger.run()debugger.detach()
要学习的内容:
DEBUG_EVENT 结构中的联合体和 dwDebugEventCode 的关系:
在get_debug_event
函数中实现了当 dwDebugEventCode 等于 0x1
的情况。
值得说的是:在DEBUG_EVENT 联合体中 Exception.ExceptionRecord.ExceptionCode
又很多取值,有一个是EXCEPTION_GUARD_PAGE
,但这个取值在 微软学习手册里没有找到,在VS中的minwinbase.h
头文件中有定义
#define EXCEPTION_GUARD_PAGE STATUS_GUARD_PAGE_VIOLATION
软断点(INT3)
这个节坑太多,原文中显示源代码和运行的都不一样。需要看源代码文件去慢慢领悟。
还有就是迁移到python3有很多需要注意的地方,例如语法不同、ctypes调用API的方法不同
- my_debugger_define.py
import ctypesBYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_wchar)
LPCSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR = ctypes.c_ulongDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF# snapshot
TH32CS_SNAPTHREAD = 0x00000004# Context flags for GetThreadContext()
CONTEXT_FULL = 0x00010007
CONTEXT_DEBUG_REGISTERS = 0x00010010# Debug event constants
EXCEPTION_DEBUG_EVENT = 0x1# debug exception codes.
EXCEPTION_ACCESS_VIOLATION = 0xC0000005
EXCEPTION_BREAKPOINT = 0x80000003
EXCEPTION_GUARD_PAGE = 0x80000001
EXCEPTION_SINGLE_STEP = 0x80000004# Memory page permissions, used by VirtualProtect()
PAGE_EXECUTE_READWRITE = 0x00000040# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE) ]# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess", HANDLE),("hThread", HANDLE),("dwProcessId", DWORD),("dwThreadId", DWORD),]class EXCEPTION_RECORD(ctypes.Structure):pass
EXCEPTION_RECORD._fields_ = [("ExceptionCode", DWORD),("ExceptionFlags", DWORD),("ExceptionRecord", ctypes.POINTER(EXCEPTION_RECORD)),("ExceptionAddress", PVOID),("NumberParameters", DWORD),("ExceptionInformation", UINT_PTR * 15),]class EXCEPTION_DEBUG_INFO(ctypes.Structure):_fields_ = [("ExceptionRecord", EXCEPTION_RECORD),("dwFirstChance", DWORD),]class DEBUG_EVENT_UNION(ctypes.Union):_fields_ = [("Exception", EXCEPTION_DEBUG_INFO),
# ("CreateThread", CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread", EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess", EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll", LOAD_DLL_DEBUG_INFO),
# ("UnloadDll", UNLOAD_DLL_DEBUG_INFO),
# ("DebugString", OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo", RIP_INFO),] class DEBUG_EVENT(ctypes.Structure):_fields_ = [("dwDebugEventCode", DWORD),("dwProcessId", DWORD),("dwThreadId", DWORD),("u", DEBUG_EVENT_UNION),]class THREADENTRY32(ctypes.Structure):_fields_ = [("dwSize", DWORD),("cntUsage", DWORD),("th32ThreadID", DWORD),("th32OwnerProcessID", DWORD),("tpBasePri", DWORD),("tpDeltaPri", DWORD),("dwFlags", DWORD),]class FLOATING_SAVE_AREA(ctypes.Structure):_fields_ = [("ControlWord", DWORD),("StatusWord", DWORD),("TagWord", DWORD),("ErrorOffset", DWORD),("ErrorSelector", DWORD),("DataOffset", DWORD),("DataSelector", DWORD),("RegisterArea", BYTE * 80),("Cr0NpxState", DWORD),
]# class CONTEXT(ctypes.Structure):
# _fields_ = [# ("ContextFlags", DWORD),
# ("Dr0", DWORD),
# ("Dr1", DWORD),
# ("Dr2", DWORD),
# ("Dr3", DWORD),
# ("Dr6", DWORD),
# ("Dr7", DWORD),
# ("FloatSave", FLOATING_SAVE_AREA),
# ("SegGs", DWORD),
# ("SegFs", DWORD),
# ("SegEs", DWORD),
# ("SegDs", DWORD),
# ("Edi", DWORD),
# ("Esi", DWORD),
# ("Ebx", DWORD),
# ("Edx", DWORD),
# ("Ecx", DWORD),
# ("Eax", DWORD),
# ("Ebp", DWORD),
# ("Eip", DWORD),
# ("SegCs", DWORD),
# ("EFlags", DWORD),
# ("Esp", DWORD),
# ("SegSs", DWORD),
# ("ExtendedRegisters", BYTE * 512),
# ]DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlongclass M128A(ctypes.Structure):_fields_ = [("Low", ULONGLONG),("High", LONGLONG)]class CONTEXT_UNION(ctypes.Union):_fields_ = [("S", DWORD * 32)]
class CONTEXT(ctypes.Structure):_fields_ = [("P1Home", DWORD64),("P2Home", DWORD64),("P3Home", DWORD64),("P4Home", DWORD64),("P5Home", DWORD64),("P6Home", DWORD64),("ContextFlags", DWORD),("MxCsr", DWORD),("SegCs", WORD),("SegDs", WORD),("SegEs", WORD),("SegFs", WORD),("SegGs", WORD),("SegSs", WORD),("EFlags", DWORD),("Dr0", DWORD),("Dr1", DWORD),("Dr2", DWORD),("Dr3", DWORD),("Dr6", DWORD),("Dr7", DWORD),("Rax", DWORD64),("Rcx", DWORD64),("Rdx", DWORD64),("Rbx", DWORD64),("Rsp", DWORD64),("Rbp", DWORD64),("Rsi", DWORD64),("Rdi", DWORD64),("R8", DWORD64),("R9", DWORD64),("R10", DWORD64),("R11", DWORD64),("R12", DWORD64),("R13", DWORD64),("R14", DWORD64),("R15", DWORD64),("Rip", DWORD64),("DUMMYUNIONNAME",CONTEXT_UNION),("VectorRegister", M128A * 26),("VectorControl", DWORD64),("DebugControl", DWORD64),("LastBranchToRip", DWORD64),("LastBranchFromRip", DWORD64),("LastExceptionToRip", DWORD64),("LastExceptionFromRip", DWORD64)
]LPCONTEXT = ctypes.POINTER(CONTEXT)
- my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):self.h_process = Noneself.pid = Noneself.debugger_active = False# 实现调试事件处理功能self.h_thread = Noneself.context = Noneself.exception = Noneself.exception_address = None# 软断点self.breakpoints = {}self.first_breakpoint = Truedef load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)# 保存上面创建进程的句柄,以供后续进程访问使用self.h_process = self.open_process(process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())# 打开 pid 进程,获得该进程的所有权限# 返回该进程句柄def open_process(self, pid):return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)def attach(self, pid):self.h_process = self.open_process(pid)# DebugActiveProcess 附加进程if kernel32.DebugActiveProcess(pid):self.debugger_active = Trueself.pid = int(pid)# self.run()else:print("[*] Unable to attach to the process.")# 循环调用 WaitForDebugEvent ,等待调试事件发生def run(self):while self.debugger_active == True:self.get_debug_event()def get_debug_event(self):debug_event = my_debugger_defines.DEBUG_EVENT()continue_status = my_debugger_defines.DBG_CONTINUE# INFINITE:直到有调试事件发生# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):# 处理指定异常事件# 获取线程句柄self.h_thread = self.open_thread(debug_event.dwThreadId)self.context = self.get_thread_context(h_thread=self.h_thread)# 打印调试事件类型和线程IDprint("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))# Event Code = 0x1if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:# 获取异常代码exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode# 异常地址self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress# 内存违法访问if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:print("Acess Violation Detected.")# 遇到一个断点elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:continue_status = self.exception_handler_breakpoint()# 内存断点elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:print("Guard Page Access Detected.")# 硬件断点elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:print("Single Stepping.")# input("press a key to continue...")# self.debugger_active = False# 调试事件处理完成后,恢复原执行状态# continue_status -> DBG_CONTINUE:下一步继续执行kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status)def exception_handler_breakpoint(self):print("[*] Exception address: 0x%08x" % self.exception_address)# 检查断点是否为我们设置的断点if not (self.exception_address in self.breakpoints): # 如果它是第一个Windows驱动的断点# 就继续执行if self.first_breakpoint == True:self.first_breakpoint = Falseprint("[*] Hit the first breakpoint.")return my_debugger_defines.DBG_CONTINUEelse:print("[*] Hit user defined breakpoint.")# 处理设置的断点# 先还原原始数据(没设断点之前)self.write_process_memory(self.exception_address, self.breakpoints[self.exception_address])# 获取新的context# 将EIP重置回原始字节处 self.context = self.get_thread_context(h_thread=self.h_thread)if self.context:self.context.Rip -= 1# 用新的RIP值设置线程的上下文记录kernel32.SetThreadContext(self.h_thread,ctypes.byref(self.context))self.debugger_active = Trueelse:self.debugger_active = Falsecontinue_status = my_debugger_defines.DBG_CONTINUEreturn continue_status# DebugActiveProcessStop:去除附加def detach(self):if kernel32.DebugActiveProcessStop(self.pid):print("[*] Finished debugging. Exiting...")return Trueelse:print("There was an error")return False# 通过线程ID打开线程,获取线程句柄def open_thread(self, thread_id):h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)if h_thread is not None:return h_threadelse:print("[*] Could not obtain a valid thread handle.")return False# 获取属于pid进程的线程的IDdef enumerate_threads(self):# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息thread_entry = my_debugger_defines.THREADENTRY32()thread_list = []# CreateToolhelp32Snapshot返回一个快照句柄# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,# 需要一一对比snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)if snapshot is not None:thread_entry.dwSize = ctypes.sizeof(thread_entry)# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))while success:# 对比该线程的拥有者是不是 pidif thread_entry.th32OwnerProcessID == self.pid:# 把该进程的线程ID放到列表thread_list.append(thread_entry.th32ThreadID)# 遍历后面所有线程success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))kernel32.CloseHandle(snapshot)return thread_listelse:return False# 通过线程ID,获取线程上下文contextdef get_thread_context(self, thread_id=None, h_thread=None):# 线程上下文结构体为 CONTEXTcontext = my_debugger_defines.CONTEXT()context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS# 通过线程ID,获取线程句柄if h_thread is None:self.h_thread = self.open_thread(thread_id)# 通过线程句柄,线程上下文kernel32.GetThreadContext.argtypes = [ctypes.c_void_p, my_debugger_defines.LPCONTEXT]kernel32.GetThreadContext.restype = ctypes.c_boolif kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):kernel32.CloseHandle(h_thread)return contextelse:# return my_debugger_defines.CONTEXT()return False# 读取address处的内容length个字节# 返回读取到的内容def read_process_memory(self, address, length):data = b""read_buf = ctypes.create_string_buffer(length)count = ctypes.c_ulong(0)kernel32.ReadProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.ReadProcessMemory.restype = ctypes.c_boolif not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, ctypes.byref(count)):return Falseelse:data += read_buf.rawreturn data# 把data数据写入到address处def write_process_memory(self, address, data):count = ctypes.c_ulong(0)length = len(data)c_data = ctypes.c_char_p(data[count.value:])kernel32.WriteProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.WriteProcessMemory.restype = ctypes.c_boolif not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, ctypes.byref(count)):return Falseelse:return Truedef bp_set(self, address):print("[*] Setting breakpoint at: 0x%08x" % address)if not (address in self.breakpoints):# 设置断点前,先保存之前的数据old_protect = ctypes.c_ulong(0)kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, my_debugger_defines.DWORD, ctypes.POINTER(ctypes.c_ulong)]kernel32.VirtualProtectEx.restype = ctypes.c_boolkernel32.VirtualProtectEx(self.h_process, address, 1, my_debugger_defines.PAGE_EXECUTE_READWRITE, ctypes.byref(old_protect))# 读取原始1个字节original_byte = self.read_process_memory(address, 1)if original_byte != False:# 写入 INT3 opcodeif self.write_process_memory(address, b"\xCC"):# 在字典中注册断点self.breakpoints[address] = original_bytereturn Trueelse:return False# 解析模块中函数的地址def func_resolve(self, dll, function):# HMODULE GetModuleHandleW(# [in, optional] LPCWSTR lpModuleName# );kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]kernel32.GetModuleHandleW.restype = ctypes.c_void_p# FARPROC GetProcAddress(# [in] HMODULE hModule,# [in] LPCSTR lpProcName# );kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]kernel32.GetProcAddress.restype = ctypes.c_void_phandle = kernel32.GetModuleHandleW(dll)address = kernel32.GetProcAddress(handle, function)kernel32.CloseHandle.argtypes = [ctypes.c_void_p]kernel32.CloseHandle.restype = ctypes.c_boolkernel32.CloseHandle(handle)return address
- my_test.py
import my_debuggerdebugger = my_debugger.debugger()pid = input("Enter the PID of the process to attach to:")debugger.attach(int(pid))printf_address = debugger.func_resolve("msvcrt.dll", b"printf")print("[*] Address of printf: 0x%08x" % printf_address)debugger.bp_set(printf_address)debugger.run()debugger.detach()
- printf_loop.py
这个是被调试程序
import ctypes
import timemsvcrt = ctypes.cdll.msvcrtcounter = 0while True:msvcrt.printf(b"Loop iteration %d!\n", counter)time.sleep(2)counter += 1
运行结果正好符合预期
Event Code: 6 Thread ID: 18744
Event Code: 6 Thread ID: 18744
Event Code: 6 Thread ID: 18744
Event Code: 2 Thread ID: 18880
Event Code: 1 Thread ID: 18880
[*] Exception address: 0x7ffe68e10b10
[*] Hit the first breakpoint.
Event Code: 4 Thread ID: 18880
Event Code: 1 Thread ID: 18744
[*] Exception address: 0x7ffe68b88b50
[*] Hit user defined breakpoint.
[*] Finished debugging. Exiting...
硬件断点
这里define修改了CONTEXT结构体,之前没有影响,在硬件断点出现故障了
- my_debugger_define.py
import ctypesBYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_wchar)
LPCSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR = ctypes.c_ulongDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFFDBG_CONTINUE = 0x00010002
DBG_EXCEPTION_NOT_HANDLED = 0x80010001INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF# snapshot
TH32CS_SNAPTHREAD = 0x00000004# Context flags for GetThreadContext()
CONTEXT_FULL = 0x00010007
CONTEXT_DEBUG_REGISTERS = 0x00010010# Debug event constants
EXCEPTION_DEBUG_EVENT = 0x1# debug exception codes.
EXCEPTION_ACCESS_VIOLATION = 0xC0000005
EXCEPTION_BREAKPOINT = 0x80000003
EXCEPTION_GUARD_PAGE = 0x80000001
EXCEPTION_SINGLE_STEP = 0x80000004# Memory page permissions, used by VirtualProtect()
PAGE_EXECUTE_READWRITE = 0x00000040# Hardware breakpoint conditions
HW_ACCESS = 0x00000003
HW_EXECUTE = 0x00000000
HW_WRITE = 0x00000001# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE) ]# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess", HANDLE),("hThread", HANDLE),("dwProcessId", DWORD),("dwThreadId", DWORD),]class EXCEPTION_RECORD(ctypes.Structure):pass
EXCEPTION_RECORD._fields_ = [("ExceptionCode", DWORD),("ExceptionFlags", DWORD),("ExceptionRecord", ctypes.POINTER(EXCEPTION_RECORD)),("ExceptionAddress", PVOID),("NumberParameters", DWORD),("ExceptionInformation", UINT_PTR * 15),]class EXCEPTION_DEBUG_INFO(ctypes.Structure):_fields_ = [("ExceptionRecord", EXCEPTION_RECORD),("dwFirstChance", DWORD),]class DEBUG_EVENT_UNION(ctypes.Union):_fields_ = [("Exception", EXCEPTION_DEBUG_INFO),
# ("CreateThread", CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread", EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess", EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll", LOAD_DLL_DEBUG_INFO),
# ("UnloadDll", UNLOAD_DLL_DEBUG_INFO),
# ("DebugString", OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo", RIP_INFO),] class DEBUG_EVENT(ctypes.Structure):_fields_ = [("dwDebugEventCode", DWORD),("dwProcessId", DWORD),("dwThreadId", DWORD),("u", DEBUG_EVENT_UNION),]class THREADENTRY32(ctypes.Structure):_fields_ = [("dwSize", DWORD),("cntUsage", DWORD),("th32ThreadID", DWORD),("th32OwnerProcessID", DWORD),("tpBasePri", DWORD),("tpDeltaPri", DWORD),("dwFlags", DWORD),]class FLOATING_SAVE_AREA(ctypes.Structure):_fields_ = [("ControlWord", DWORD),("StatusWord", DWORD),("TagWord", DWORD),("ErrorOffset", DWORD),("ErrorSelector", DWORD),("DataOffset", DWORD),("DataSelector", DWORD),("RegisterArea", BYTE * 80),("Cr0NpxState", DWORD),
]# class CONTEXT(ctypes.Structure):
# _fields_ = [# ("ContextFlags", DWORD),
# ("Dr0", DWORD),
# ("Dr1", DWORD),
# ("Dr2", DWORD),
# ("Dr3", DWORD),
# ("Dr6", DWORD),
# ("Dr7", DWORD),
# ("FloatSave", FLOATING_SAVE_AREA),
# ("SegGs", DWORD),
# ("SegFs", DWORD),
# ("SegEs", DWORD),
# ("SegDs", DWORD),
# ("Edi", DWORD),
# ("Esi", DWORD),
# ("Ebx", DWORD),
# ("Edx", DWORD),
# ("Ecx", DWORD),
# ("Eax", DWORD),
# ("Ebp", DWORD),
# ("Eip", DWORD),
# ("SegCs", DWORD),
# ("EFlags", DWORD),
# ("Esp", DWORD),
# ("SegSs", DWORD),
# ("ExtendedRegisters", BYTE * 512),
# ]DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlongclass M128A(ctypes.Structure):_fields_ = [("Low", ULONGLONG),("High", LONGLONG)]class CONTEXT_UNION(ctypes.Union):_fields_ = [("S", DWORD * 32)]
class CONTEXT(ctypes.Structure):_fields_ = [("P1Home", DWORD64),("P2Home", DWORD64),("P3Home", DWORD64),("P4Home", DWORD64),("P5Home", DWORD64),("P6Home", DWORD64),("ContextFlags", DWORD),("MxCsr", DWORD),("SegCs", WORD),("SegDs", WORD),("SegEs", WORD),("SegFs", WORD),("SegGs", WORD),("SegSs", WORD),("EFlags", DWORD),("Dr0", DWORD64),("Dr1", DWORD64),("Dr2", DWORD64),("Dr3", DWORD64),("Dr6", DWORD64),("Dr7", DWORD64),("Rax", DWORD64),("Rcx", DWORD64),("Rdx", DWORD64),("Rbx", DWORD64),("Rsp", DWORD64),("Rbp", DWORD64),("Rsi", DWORD64),("Rdi", DWORD64),("R8", DWORD64),("R9", DWORD64),("R10", DWORD64),("R11", DWORD64),("R12", DWORD64),("R13", DWORD64),("R14", DWORD64),("R15", DWORD64),("Rip", DWORD64),("DUMMYUNIONNAME",CONTEXT_UNION),("VectorRegister", M128A * 26),("VectorControl", DWORD64),("DebugControl", DWORD64),("LastBranchToRip", DWORD64),("LastBranchFromRip", DWORD64),("LastExceptionToRip", DWORD64),("LastExceptionFromRip", DWORD64)
]LPCONTEXT = ctypes.POINTER(CONTEXT)
- my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):self.h_process = Noneself.pid = Noneself.debugger_active = False# 实现调试事件处理功能self.h_thread = Noneself.context = Noneself.exception = Noneself.exception_address = None# 软断点self.breakpoints = {}self.first_breakpoint = True# 硬中断self.hardware_breakpoints = {}def load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)# 保存上面创建进程的句柄,以供后续进程访问使用self.h_process = self.open_process(process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())# 打开 pid 进程,获得该进程的所有权限# 返回该进程句柄def open_process(self, pid):return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)def attach(self, pid):self.h_process = self.open_process(pid)# DebugActiveProcess 附加进程if kernel32.DebugActiveProcess(pid):self.debugger_active = Trueself.pid = int(pid)# self.run()else:print("[*] Unable to attach to the process.")# 循环调用 WaitForDebugEvent ,等待调试事件发生def run(self):while self.debugger_active == True:self.get_debug_event()def get_debug_event(self):debug_event = my_debugger_defines.DEBUG_EVENT()continue_status = my_debugger_defines.DBG_CONTINUE# INFINITE:直到有调试事件发生# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):# 处理指定异常事件# 获取线程句柄self.h_thread = self.open_thread(debug_event.dwThreadId)self.context = self.get_thread_context(h_thread=self.h_thread)# 打印调试事件类型和线程IDprint("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))# Event Code = 0x1if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:# 获取异常代码exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode# print("0x%08x" % exception)# 异常地址self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress# 内存违法访问if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:print("Acess Violation Detected.")# 遇到一个断点elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:continue_status = self.exception_handler_breakpoint()# 内存断点elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:print("Guard Page Access Detected.")# 硬件断点elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:print("Single Stepping.")self.exception_handler_single_step()# input("press a key to continue...")# self.debugger_active = False# 调试事件处理完成后,恢复原执行状态# continue_status -> DBG_CONTINUE:下一步继续执行kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status)def exception_handler_breakpoint(self):print("[*] Exception address: 0x%08x" % self.exception_address)# 检查断点是否为我们设置的断点if not (self.exception_address in self.breakpoints): # 如果它是第一个Windows驱动的断点# 就继续执行if self.first_breakpoint == True:self.first_breakpoint = Falseprint("[*] Hit the first breakpoint.")return my_debugger_defines.DBG_CONTINUEelse:print("[*] Hit user defined breakpoint.")# 处理设置的断点# 先还原原始数据(没设断点之前)self.write_process_memory(self.exception_address, self.breakpoints[self.exception_address])# 获取新的context# 将EIP重置回原始字节处 self.context = self.get_thread_context(h_thread=self.h_thread)if self.context:self.context.Rip -= 1# 用新的RIP值设置线程的上下文记录kernel32.SetThreadContext(self.h_thread,ctypes.byref(self.context))self.debugger_active = Trueelse:self.debugger_active = Falsecontinue_status = my_debugger_defines.DBG_CONTINUEreturn continue_statusdef exception_handler_single_step(self):if self.context.Dr6 & 0x1 and (0 in self.hardware_breakpoints):slot = 0elif self.context.Dr6 & 0x2 and (1 in self.hardware_breakpoints):slot = 1elif self.context.Dr6 & 0x4 and (2 in self.hardware_breakpoints):slot = 2elif self.context.Dr6 & 0x8 and (3 in self.hardware_breakpoints):slot = 3else:continue_status = my_debugger_defines.DBG_EXCEPTION_NOT_HANDLED# 从断点字典中删除断点if self.bp_del_hw(slot):continue_status = my_debugger_defines.DBG_CONTINUEprint("[*] Hardware breakpoint removed.")return continue_statusdef bp_del_hw(self, slot):for thread_id in self.enumerate_threads():context = self.get_thread_context(thread_id=thread_id)# 给所有线程删除断点context.Dr7 &= ~(1 << (slot * 2))# 将断点地址清零if slot == 0: context.Dr0 = 0x0000000000000000elif slot == 1: context.Dr1 = 0x0000000000000000elif slot == 2: context.Dr2 = 0x0000000000000000elif slot == 3: context.Dr3 = 0x0000000000000000# 移除Dr7中的触发断点标志位context.Dr7 &= ~(3 << ((slot * 4) + 16))# 移除断点长度标志位context.Dr7 &= ~(3 << ((slot * 4) + 18))# 提交移除断点后的线程contexth_thread = self.open_thread(thread_id)kernel32.SetThreadContext(h_thread, ctypes.byref(context))# 把该断点从字典中移除del self.hardware_breakpoints[slot]return True# DebugActiveProcessStop:去除附加def detach(self):if kernel32.DebugActiveProcessStop(self.pid):print("[*] Finished debugging. Exiting...")return Trueelse:print("There was an error")return False# 通过线程ID打开线程,获取线程句柄def open_thread(self, thread_id):h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)if h_thread is not None:return h_threadelse:print("[*] Could not obtain a valid thread handle.")return False# 获取属于pid进程的线程的IDdef enumerate_threads(self):# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息thread_entry = my_debugger_defines.THREADENTRY32()thread_list = []# CreateToolhelp32Snapshot返回一个快照句柄# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,# 需要一一对比snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)if snapshot is not None:thread_entry.dwSize = ctypes.sizeof(thread_entry)# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))while success:# 对比该线程的拥有者是不是 pidif thread_entry.th32OwnerProcessID == self.pid:# 把该进程的线程ID放到列表thread_list.append(thread_entry.th32ThreadID)# 遍历后面所有线程success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))kernel32.CloseHandle(snapshot)return thread_listelse:return False# 通过线程ID,获取线程上下文contextdef get_thread_context(self, thread_id=None, h_thread=None):# 线程上下文结构体为 CONTEXTcontext = my_debugger_defines.CONTEXT()context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS# 通过线程ID,获取线程句柄if h_thread is None:self.h_thread = self.open_thread(thread_id)# 通过线程句柄,线程上下文kernel32.GetThreadContext.argtypes = [ctypes.c_void_p, my_debugger_defines.LPCONTEXT]kernel32.GetThreadContext.restype = ctypes.c_boolif kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):kernel32.CloseHandle(h_thread)return contextelse:# return my_debugger_defines.CONTEXT()return False# 读取address处的内容length个字节# 返回读取到的内容def read_process_memory(self, address, length):data = b""read_buf = ctypes.create_string_buffer(length)count = ctypes.c_ulong(0)kernel32.ReadProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.ReadProcessMemory.restype = ctypes.c_boolif not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, ctypes.byref(count)):return Falseelse:data += read_buf.rawreturn data# 把data数据写入到address处def write_process_memory(self, address, data):count = ctypes.c_ulong(0)length = len(data)c_data = ctypes.c_char_p(data[count.value:])kernel32.WriteProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.WriteProcessMemory.restype = ctypes.c_boolif not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, ctypes.byref(count)):return Falseelse:return Truedef bp_set(self, address):print("[*] Setting breakpoint at: 0x%08x" % address)if not (address in self.breakpoints):# 设置断点前,先保存之前的数据old_protect = ctypes.c_ulong(0)kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, my_debugger_defines.DWORD, ctypes.POINTER(ctypes.c_ulong)]kernel32.VirtualProtectEx.restype = ctypes.c_boolkernel32.VirtualProtectEx(self.h_process, address, 1, my_debugger_defines.PAGE_EXECUTE_READWRITE, ctypes.byref(old_protect))# 读取原始1个字节original_byte = self.read_process_memory(address, 1)if original_byte != False:# 写入 INT3 opcodeif self.write_process_memory(address, b"\xCC"):# 在字典中注册断点self.breakpoints[address] = original_bytereturn Trueelse:return False# 硬件断点def bp_set_hw(self, address, length, condition):# 检测硬件断点长度是否有效if length not in (1, 2, 4):return Falseelse:length -= 1# 检测硬件断点的触发调试是否有效if condition not in (my_debugger_defines.HW_ACCESS, my_debugger_defines.HW_EXECUTE, my_debugger_defines.HW_WRITE):return False# 检测是否存在空置的调试器槽if not (0 in self.hardware_breakpoints):available = 0elif not (1 in self.hardware_breakpoints):available = 1elif not (2 in self.hardware_breakpoints):available = 2elif not (3 in self.hardware_breakpoints):available = 3else:return False# 在每个线程环境下设置调试寄存器for thread_id in self.enumerate_threads():context = self.get_thread_context(thread_id=thread_id)# 设置DR7中对应的标志位,激活硬件断点context.Dr7 |= 1 << (available * 2)# 在空闲的DR0 ~ 3寄存器中写入断点地址if available == 0:context.Dr0 = address# print("Dr0 0x%08x" % context.Dr0)elif available == 1:context.Dr1 = address# print("Dr1 0x%08x" % context.Dr1)elif available == 2:context.Dr2 = address# print("Dr2 0x%08x" % context.Dr2)elif available == 3:context.Dr3 = address# print("Dr3 0x%08x" % context.Dr3)# 设置触发条件context.Dr7 |= condition << ((available * 4) + 16)# 设置触发长度context.Dr7 |= length << ((available * 4) + 18)# 提交改动后的contexth_thread = self.open_thread(thread_id)kernel32.SetThreadContext(h_thread, ctypes.byref(context))# 更新内部硬件断点列表self.hardware_breakpoints[available] = (address, length, condition)return True# 解析模块中函数的地址def func_resolve(self, dll, function):# HMODULE GetModuleHandleW(# [in, optional] LPCWSTR lpModuleName# );kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]kernel32.GetModuleHandleW.restype = ctypes.c_void_p# FARPROC GetProcAddress(# [in] HMODULE hModule,# [in] LPCSTR lpProcName# );kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]kernel32.GetProcAddress.restype = ctypes.c_void_phandle = kernel32.GetModuleHandleW(dll)address = kernel32.GetProcAddress(handle, function)kernel32.CloseHandle.argtypes = [ctypes.c_void_p]kernel32.CloseHandle.restype = ctypes.c_boolkernel32.CloseHandle(handle)return address
- printf_loop.py
import ctypes
import timemsvcrt = ctypes.cdll.msvcrtcounter = 0while True:msvcrt.printf(b"Loop iteration %d!\n", counter)time.sleep(2)counter += 1
- my_test.py
import my_debugger
from my_debugger_defines import *debugger = my_debugger.debugger()pid = input("Enter the PID of the process to attach to:")debugger.attach(int(pid))printf_address = debugger.func_resolve("msvcrt.dll", b"printf")print("[*] Address of printf: 0x%08x" % printf_address)debugger.bp_set_hw(printf_address, 1, HW_EXECUTE)debugger.run()debugger.detach()
运行结果(这里显示了一些调试的print)
Event Code: 2 Thread ID: 9928
Event Code: 1 Thread ID: 9928
0x80000003
[*] Exception address: 0x7ffe68e10b10
[*] Hit the first breakpoint.
Event Code: 4 Thread ID: 9928
Event Code: 1 Thread ID: 18456
0x80000004
Single Stepping.
[*] Hardware breakpoint removed.
Event Code: 2 Thread ID: 21584
内存断点
内存断点本质上不是真正的断点。当一个调试器设置一个内存断点时,调试器实质上所做的是改变一个内存区域或–个内存页的访问权限。内存页是操作系统可以一次处理的最小内存块。操作系统每分配一个内存页时,都会为这个内存页设置访问权限,该权限决定了这个内存页被访问的方式。以下是几个不同的内存页访问权限的例子:
- 页可执行:允许进程执行页上的代码,但是如果进程试图读写这个页将导致非法内存操作异常;
- 页可读:进程只能从这个内存页中读取数据;任何企图写入数据或者执行代码的操作会导致非法内存操作异常;
- 页可写:允许进程在这个内存页上写入数据;
- 保护页:对保护页任何类型的访问将导致一次性异常, 之后这个内存页会恢复到之前的状态。
主要过程
VirtualQueryEx :找内存断点区域的内存页
VirtualProtectEx :修改内存页属性为保护页
- my_debugger_define.py
import ctypesBYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_wchar)
LPCSTR = ctypes.POINTER(ctypes.c_char)
LPVOID = ctypes.c_void_p
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR = ctypes.c_ulong
SIZE_T = ctypes.c_ulongDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFFDBG_CONTINUE = 0x00010002
DBG_EXCEPTION_NOT_HANDLED = 0x80010001INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF# snapshot
TH32CS_SNAPTHREAD = 0x00000004# Context flags for GetThreadContext()
CONTEXT_FULL = 0x00010007
CONTEXT_DEBUG_REGISTERS = 0x00010010# Debug event constants
EXCEPTION_DEBUG_EVENT = 0x1# debug exception codes.
EXCEPTION_ACCESS_VIOLATION = 0xC0000005
EXCEPTION_BREAKPOINT = 0x80000003
EXCEPTION_GUARD_PAGE = 0x80000001
EXCEPTION_SINGLE_STEP = 0x80000004# Memory page permissions, used by VirtualProtect()
PAGE_EXECUTE_READWRITE = 0x00000040# Hardware breakpoint conditions
HW_ACCESS = 0x00000003
HW_EXECUTE = 0x00000000
HW_WRITE = 0x00000001# Memory page permissions, used by VirtualProtect()
PAGE_NOACCESS = 0x00000001
PAGE_READONLY = 0x00000002
PAGE_READWRITE = 0x00000004
PAGE_WRITECOPY = 0x00000008
PAGE_EXECUTE = 0x00000010
PAGE_EXECUTE_READ = 0x00000020
PAGE_EXECUTE_READWRITE = 0x00000040
PAGE_EXECUTE_WRITECOPY = 0x00000080
PAGE_GUARD = 0x00000100
PAGE_NOCACHE = 0x00000200
PAGE_WRITECOMBINE = 0x00000400# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE) ]# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess", HANDLE),("hThread", HANDLE),("dwProcessId", DWORD),("dwThreadId", DWORD),]class EXCEPTION_RECORD(ctypes.Structure):pass
EXCEPTION_RECORD._fields_ = [("ExceptionCode", DWORD),("ExceptionFlags", DWORD),("ExceptionRecord", ctypes.POINTER(EXCEPTION_RECORD)),("ExceptionAddress", PVOID),("NumberParameters", DWORD),("ExceptionInformation", UINT_PTR * 15),]class EXCEPTION_DEBUG_INFO(ctypes.Structure):_fields_ = [("ExceptionRecord", EXCEPTION_RECORD),("dwFirstChance", DWORD),]class DEBUG_EVENT_UNION(ctypes.Union):_fields_ = [("Exception", EXCEPTION_DEBUG_INFO),
# ("CreateThread", CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread", EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess", EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll", LOAD_DLL_DEBUG_INFO),
# ("UnloadDll", UNLOAD_DLL_DEBUG_INFO),
# ("DebugString", OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo", RIP_INFO),] class DEBUG_EVENT(ctypes.Structure):_fields_ = [("dwDebugEventCode", DWORD),("dwProcessId", DWORD),("dwThreadId", DWORD),("u", DEBUG_EVENT_UNION),]class THREADENTRY32(ctypes.Structure):_fields_ = [("dwSize", DWORD),("cntUsage", DWORD),("th32ThreadID", DWORD),("th32OwnerProcessID", DWORD),("tpBasePri", DWORD),("tpDeltaPri", DWORD),("dwFlags", DWORD),]class FLOATING_SAVE_AREA(ctypes.Structure):_fields_ = [("ControlWord", DWORD),("StatusWord", DWORD),("TagWord", DWORD),("ErrorOffset", DWORD),("ErrorSelector", DWORD),("DataOffset", DWORD),("DataSelector", DWORD),("RegisterArea", BYTE * 80),("Cr0NpxState", DWORD),
]# class CONTEXT(ctypes.Structure):
# _fields_ = [# ("ContextFlags", DWORD),
# ("Dr0", DWORD),
# ("Dr1", DWORD),
# ("Dr2", DWORD),
# ("Dr3", DWORD),
# ("Dr6", DWORD),
# ("Dr7", DWORD),
# ("FloatSave", FLOATING_SAVE_AREA),
# ("SegGs", DWORD),
# ("SegFs", DWORD),
# ("SegEs", DWORD),
# ("SegDs", DWORD),
# ("Edi", DWORD),
# ("Esi", DWORD),
# ("Ebx", DWORD),
# ("Edx", DWORD),
# ("Ecx", DWORD),
# ("Eax", DWORD),
# ("Ebp", DWORD),
# ("Eip", DWORD),
# ("SegCs", DWORD),
# ("EFlags", DWORD),
# ("Esp", DWORD),
# ("SegSs", DWORD),
# ("ExtendedRegisters", BYTE * 512),
# ]DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlongclass M128A(ctypes.Structure):_fields_ = [("Low", ULONGLONG),("High", LONGLONG)]# class DUMMYUNIONNAME(Union):
# _fields_=[
# ("FltSave", XMM_SAVE_AREA32),
# ("DummyStruct", DUMMYSTRUCTNAME)
# ]# class DUMMYSTRUCTNAME(Structure):
# _fields_=[
# ("Header", M128A * 2),
# ("Legacy", M128A * 8),
# ("Xmm0", M128A),
# ("Xmm1", M128A),
# ("Xmm2", M128A),
# ("Xmm3", M128A),
# ("Xmm4", M128A),
# ("Xmm5", M128A),
# ("Xmm6", M128A),
# ("Xmm7", M128A),
# ("Xmm8", M128A),
# ("Xmm9", M128A),
# ("Xmm10", M128A),
# ("Xmm11", M128A),
# ("Xmm12", M128A),
# ("Xmm13", M128A),
# ("Xmm14", M128A),
# ("Xmm15", M128A)
# ]# class XMM_SAVE_AREA32(Structure):
# _pack_ = 1
# _fields_ = [
# ('ControlWord', WORD),
# ('StatusWord', WORD),
# ('TagWord', BYTE),
# ('Reserved1', BYTE),
# ('ErrorOpcode', WORD),
# ('ErrorOffset', DWORD),
# ('ErrorSelector', WORD),
# ('Reserved2', WORD),
# ('DataOffset', DWORD),
# ('DataSelector', WORD),
# ('Reserved3', WORD),
# ('MxCsr', DWORD),
# ('MxCsr_Mask', DWORD),
# ('FloatRegisters', M128A * 8),
# ('XmmRegisters', M128A * 16),
# ('Reserved4', BYTE * 96)
# ] class CONTEXT_UNION(ctypes.Union):_fields_ = [("S", DWORD * 32)]
class CONTEXT(ctypes.Structure):_fields_ = [("P1Home", DWORD64),("P2Home", DWORD64),("P3Home", DWORD64),("P4Home", DWORD64),("P5Home", DWORD64),("P6Home", DWORD64),("ContextFlags", DWORD),("MxCsr", DWORD),("SegCs", WORD),("SegDs", WORD),("SegEs", WORD),("SegFs", WORD),("SegGs", WORD),("SegSs", WORD),("EFlags", DWORD),("Dr0", DWORD64),("Dr1", DWORD64),("Dr2", DWORD64),("Dr3", DWORD64),("Dr6", DWORD64),("Dr7", DWORD64),("Rax", DWORD64),("Rcx", DWORD64),("Rdx", DWORD64),("Rbx", DWORD64),("Rsp", DWORD64),("Rbp", DWORD64),("Rsi", DWORD64),("Rdi", DWORD64),("R8", DWORD64),("R9", DWORD64),("R10", DWORD64),("R11", DWORD64),("R12", DWORD64),("R13", DWORD64),("R14", DWORD64),("R15", DWORD64),("Rip", DWORD64),("DUMMYUNIONNAME",CONTEXT_UNION),("VectorRegister", M128A * 26),("VectorControl", DWORD64),("DebugControl", DWORD64),("LastBranchToRip", DWORD64),("LastBranchFromRip", DWORD64),("LastExceptionToRip", DWORD64),("LastExceptionFromRip", DWORD64)
]LPCONTEXT = ctypes.POINTER(CONTEXT)class PROC_STRUCT(ctypes.Structure):_fields_ = [("wProcessorArchitecture", WORD),("wReserved", WORD),
]
class SYSTEM_INFO_UNION(ctypes.Union):_fields_ = [("dwOemId", DWORD),("sProcStruc", PROC_STRUCT),
]
class SYSTEM_INFO(ctypes.Structure):_fields_ = [("uSysInfo", SYSTEM_INFO_UNION),("dwPageSize", DWORD),("lpMinimumApplicationAddress", LPVOID),("lpMaximumApplicationAddress", LPVOID),("dwActiveProcessorMask", DWORD),("dwNumberOfProcessors", DWORD),("dwProcessorType", DWORD),("dwAllocationGranularity", DWORD),("wProcessorLevel", WORD),("wProcessorRevision", WORD),
]# class MEMORY_BASIC_INFORMATION(ctypes.Structure):
# _fields_ = [
# ("BaseAddress", PVOID),
# ("AllocationBase", PVOID),
# ("AllocationProtect", DWORD),
# ("PartitionId", WORD),
# ("RegionSize", SIZE_T),
# ("State", DWORD),
# ("Protect", DWORD),
# ("Type", DWORD),
# ]
class MEMORY_BASIC_INFORMATION (ctypes.Structure):_fields_ = [("BaseAddress", ULONGLONG),("AllocationBase", ULONGLONG),("AllocationProtect", DWORD),("__alignment1", DWORD),("RegionSize", ULONGLONG),("State", DWORD),("Protect", DWORD),("Type", DWORD),("__alignment2", DWORD)]
PMEMORY_BASIC_INFORMATION = ctypes.POINTER(MEMORY_BASIC_INFORMATION)
- my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):self.h_process = Noneself.pid = Noneself.debugger_active = False# 实现调试事件处理功能self.h_thread = Noneself.context = Noneself.exception = Noneself.exception_address = None# 软断点self.breakpoints = {}self.first_breakpoint = True# 硬件断点self.hardware_breakpoints = {}# 内存断点self.guarded_pages = []self.memory_breakpoints = {}# 获取当前系统默认的内存页的大小设定system_info = my_debugger_defines.SYSTEM_INFO()kernel32.GetSystemInfo(ctypes.byref(system_info))self.page_size = system_info.dwPageSizedef load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)# 保存上面创建进程的句柄,以供后续进程访问使用self.h_process = self.open_process(process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())# 打开 pid 进程,获得该进程的所有权限# 返回该进程句柄def open_process(self, pid):return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)def attach(self, pid):self.h_process = self.open_process(pid)# DebugActiveProcess 附加进程if kernel32.DebugActiveProcess(pid):self.debugger_active = Trueself.pid = int(pid)# self.run()else:print("[*] Unable to attach to the process.")# 循环调用 WaitForDebugEvent ,等待调试事件发生def run(self):while self.debugger_active == True:self.get_debug_event()def get_debug_event(self):debug_event = my_debugger_defines.DEBUG_EVENT()continue_status = my_debugger_defines.DBG_CONTINUE# INFINITE:直到有调试事件发生# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):# 处理指定异常事件# 获取线程句柄self.h_thread = self.open_thread(debug_event.dwThreadId)self.context = self.get_thread_context(h_thread=self.h_thread)# 打印调试事件类型和线程IDprint("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))# Event Code = 0x1if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:# 获取异常代码exception = debug_event.u.Exception.ExceptionRecord.ExceptionCodeprint("0x%08x" % exception)# 异常地址self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress# 内存违法访问if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:print("Acess Violation Detected.")# 遇到一个断点elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:continue_status = self.exception_handler_breakpoint()# 内存断点elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:print("Guard Page Access Detected.")# 硬件断点elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:print("Single Stepping.")self.exception_handler_single_step()# input("press a key to continue...")# self.debugger_active = False# 调试事件处理完成后,恢复原执行状态# continue_status -> DBG_CONTINUE:下一步继续执行kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status)def exception_handler_breakpoint(self):print("[*] Exception address: 0x%08x" % self.exception_address)# 检查断点是否为我们设置的断点if not (self.exception_address in self.breakpoints): # 如果它是第一个Windows驱动的断点# 就继续执行if self.first_breakpoint == True:self.first_breakpoint = Falseprint("[*] Hit the first breakpoint.")return my_debugger_defines.DBG_CONTINUEelse:print("[*] Hit user defined breakpoint.")# 处理设置的断点# 先还原原始数据(没设断点之前)self.write_process_memory(self.exception_address, self.breakpoints[self.exception_address])# 获取新的context# 将EIP重置回原始字节处 self.context = self.get_thread_context(h_thread=self.h_thread)if self.context:self.context.Rip -= 1# 用新的RIP值设置线程的上下文记录kernel32.SetThreadContext(self.h_thread,ctypes.byref(self.context))self.debugger_active = Trueelse:self.debugger_active = Falsecontinue_status = my_debugger_defines.DBG_CONTINUEreturn continue_statusdef exception_handler_single_step(self):if self.context.Dr6 & 0x1 and (0 in self.hardware_breakpoints):slot = 0elif self.context.Dr6 & 0x2 and (1 in self.hardware_breakpoints):slot = 1elif self.context.Dr6 & 0x4 and (2 in self.hardware_breakpoints):slot = 2elif self.context.Dr6 & 0x8 and (3 in self.hardware_breakpoints):slot = 3else:continue_status = my_debugger_defines.DBG_EXCEPTION_NOT_HANDLED# 从断点字典中删除断点if self.bp_del_hw(slot):continue_status = my_debugger_defines.DBG_CONTINUEprint("[*] Hardware breakpoint removed.")return continue_statusdef bp_del_hw(self, slot):for thread_id in self.enumerate_threads():context = self.get_thread_context(thread_id=thread_id)# 给所有线程删除断点context.Dr7 &= ~(1 << (slot * 2))# 将断点地址清零if slot == 0: context.Dr0 = 0x0000000000000000elif slot == 1: context.Dr1 = 0x0000000000000000elif slot == 2: context.Dr2 = 0x0000000000000000elif slot == 3: context.Dr3 = 0x0000000000000000# 移除Dr7中的触发断点标志位context.Dr7 &= ~(3 << ((slot * 4) + 16))# 移除断点长度标志位context.Dr7 &= ~(3 << ((slot * 4) + 18))# 提交移除断点后的线程contexth_thread = self.open_thread(thread_id)kernel32.SetThreadContext(h_thread, ctypes.byref(context))# 把该断点从字典中移除del self.hardware_breakpoints[slot]return True# DebugActiveProcessStop:去除附加def detach(self):if kernel32.DebugActiveProcessStop(self.pid):print("[*] Finished debugging. Exiting...")return Trueelse:print("There was an error")return False# 通过线程ID打开线程,获取线程句柄def open_thread(self, thread_id):h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)if h_thread is not None:return h_threadelse:print("[*] Could not obtain a valid thread handle.")return False# 获取属于pid进程的线程的IDdef enumerate_threads(self):# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息thread_entry = my_debugger_defines.THREADENTRY32()thread_list = []# CreateToolhelp32Snapshot返回一个快照句柄# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,# 需要一一对比snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)if snapshot is not None:thread_entry.dwSize = ctypes.sizeof(thread_entry)# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))while success:# 对比该线程的拥有者是不是 pidif thread_entry.th32OwnerProcessID == self.pid:# 把该进程的线程ID放到列表thread_list.append(thread_entry.th32ThreadID)# 遍历后面所有线程success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))kernel32.CloseHandle(snapshot)return thread_listelse:return False# 通过线程ID,获取线程上下文contextdef get_thread_context(self, thread_id=None, h_thread=None):# 线程上下文结构体为 CONTEXTcontext = my_debugger_defines.CONTEXT()context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS# 通过线程ID,获取线程句柄if h_thread is None:self.h_thread = self.open_thread(thread_id)# 通过线程句柄,线程上下文kernel32.GetThreadContext.argtypes = [ctypes.c_void_p, my_debugger_defines.LPCONTEXT]kernel32.GetThreadContext.restype = ctypes.c_boolif kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):kernel32.CloseHandle(h_thread)return contextelse:# return my_debugger_defines.CONTEXT()return False# 读取address处的内容length个字节# 返回读取到的内容def read_process_memory(self, address, length):data = b""read_buf = ctypes.create_string_buffer(length)count = ctypes.c_ulong(0)kernel32.ReadProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.ReadProcessMemory.restype = ctypes.c_boolif not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, ctypes.byref(count)):return Falseelse:data += read_buf.rawreturn data# 把data数据写入到address处def write_process_memory(self, address, data):count = ctypes.c_ulong(0)length = len(data)c_data = ctypes.c_char_p(data[count.value:])kernel32.WriteProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.WriteProcessMemory.restype = ctypes.c_boolif not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, ctypes.byref(count)):return Falseelse:return Truedef bp_set(self, address):print("[*] Setting breakpoint at: 0x%08x" % address)if not (address in self.breakpoints):# 设置断点前,先保存之前的数据old_protect = ctypes.c_ulong(0)kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, my_debugger_defines.DWORD, ctypes.POINTER(ctypes.c_ulong)]kernel32.VirtualProtectEx.restype = ctypes.c_boolkernel32.VirtualProtectEx(self.h_process, address, 1, my_debugger_defines.PAGE_EXECUTE_READWRITE, ctypes.byref(old_protect))# 读取原始1个字节original_byte = self.read_process_memory(address, 1)if original_byte != False:# 写入 INT3 opcodeif self.write_process_memory(address, b"\xCC"):# 在字典中注册断点self.breakpoints[address] = original_bytereturn Trueelse:return False# 硬件断点def bp_set_hw(self, address, length, condition):# 检测硬件断点长度是否有效if length not in (1, 2, 4):return Falseelse:length -= 1# 检测硬件断点的触发调试是否有效if condition not in (my_debugger_defines.HW_ACCESS, my_debugger_defines.HW_EXECUTE, my_debugger_defines.HW_WRITE):return False# 检测是否存在空置的调试器槽if not (0 in self.hardware_breakpoints):available = 0elif not (1 in self.hardware_breakpoints):available = 1elif not (2 in self.hardware_breakpoints):available = 2elif not (3 in self.hardware_breakpoints):available = 3else:return False# 在每个线程环境下设置调试寄存器for thread_id in self.enumerate_threads():context = self.get_thread_context(thread_id=thread_id)# 设置DR7中对应的标志位,激活硬件断点context.Dr7 |= 1 << (available * 2)# 在空闲的DR0 ~ 3寄存器中写入断点地址if available == 0:context.Dr0 = address# print("Dr0 0x%08x" % context.Dr0)elif available == 1:context.Dr1 = address# print("Dr1 0x%08x" % context.Dr1)elif available == 2:context.Dr2 = address# print("Dr2 0x%08x" % context.Dr2)elif available == 3:context.Dr3 = address# print("Dr3 0x%08x" % context.Dr3)# 设置触发条件context.Dr7 |= condition << ((available * 4) + 16)# 设置触发长度context.Dr7 |= length << ((available * 4) + 18)# 提交改动后的contexth_thread = self.open_thread(thread_id)kernel32.SetThreadContext(h_thread, ctypes.byref(context))# 更新内部硬件断点列表self.hardware_breakpoints[available] = (address, length, condition)return Truedef bp_set_mem(self, address, size):# 调用VirtualQueryEx, 返回一个MEMORY_BASIC_INFORMATION结构体# 从而获取内存页的起始地址(MEMORY_BASIC_INFORMATION.BaseAddress)mbi = my_debugger_defines.MEMORY_BASIC_INFORMATION()kernel32.VirtualQueryEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, my_debugger_defines.PMEMORY_BASIC_INFORMATION, ctypes.c_size_t]kernel32.VirtualQueryEx.restype = ctypes.c_size_t# print(address)length = kernel32.VirtualQueryEx(self.h_process, address, ctypes.byref(mbi), ctypes.sizeof(mbi))print("[*] Error with error code %d." % kernel32.GetLastError())length_mbi = ctypes.sizeof(mbi)if length < length_mbi:return False current_page = mbi.BaseAddress # 当前页的起始地址# 把内存断点涉及的内存页都设置成保护页while current_page <= address + size:self.guarded_pages.append(current_page)old_protection = ctypes.c_ulong(0)# 设置内存页的访问权限kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.VirtualProtectEx.restype = ctypes.c_boolif not kernel32.VirtualProtectEx(self.h_process, current_page, size,mbi.Protect | my_debugger_defines.PAGE_GUARD, ctypes.byref(old_protection)):return Falsecurrent_page += self.page_size# 将该内存断点记录在全局字典中self.memory_breakpoints[address] = (address, size, mbi)return True# 解析模块中函数的地址def func_resolve(self, dll, function):# HMODULE GetModuleHandleW(# [in, optional] LPCWSTR lpModuleName# );kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]kernel32.GetModuleHandleW.restype = ctypes.c_void_p handle = kernel32.GetModuleHandleW(dll)# FARPROC GetProcAddress(# [in] HMODULE hModule,# [in] LPCSTR lpProcName# );kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]kernel32.GetProcAddress.restype = ctypes.c_void_paddress = kernel32.GetProcAddress(handle, function)kernel32.CloseHandle.argtypes = [ctypes.c_void_p]kernel32.CloseHandle.restype = ctypes.c_boolkernel32.CloseHandle(handle)return address
- printf_loop.py
import ctypes
import timemsvcrt = ctypes.cdll.msvcrtcounter = 0while True:msvcrt.printf(b"Loop iteration %d!\n", counter)time.sleep(2)counter += 1
- my_test.py
import my_debugger
from my_debugger_defines import *debugger = my_debugger.debugger()pid = input("Enter the PID of the process to attach to:")debugger.attach(int(pid))printf_address = debugger.func_resolve("msvcrt.dll", b"printf")print("[*] Address of printf: 0x%08x" % printf_address)print(debugger.bp_set_mem(printf_address, 10))debugger.run()debugger.detach()
完结完结!!!