【Gray Hat Python】构建自己的windows调试器

news/2024/11/28 20:49:31/
  • 环境准备
    windows10 64bit
    python3.7 64bit

打开可执行文件,创建进程

定义变量

以下代码用 ctypes 定义了调用 windows API 需要的结构

  • my_debugger_define.py
import ctypesWORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_pDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE)   ]# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess",    HANDLE),("hThread",     HANDLE),("dwProcessId", DWORD),("dwThreadId",  DWORD),]

调用CreateProcess

值得注意的是,原版用的是python2,是ascii编码,这里使用的是python3,是Unicode编码,所以使用 CreateProcessW 函数

  • my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):passdef load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())
  • my_test.py
import my_debuggerdebugger = my_debugger.debugger()debugger.load("11.exe")

打开pid对应的进程

一些需要的结构体

  • my_debugger_define.py
import ctypesWORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR  = ctypes.c_ulongDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE)   ]# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess",    HANDLE),("hThread",     HANDLE),("dwProcessId", DWORD),("dwThreadId",  DWORD),]class EXCEPTION_RECORD(ctypes.Structure):pass
EXCEPTION_RECORD._fields_ = [("ExceptionCode",        DWORD),("ExceptionFlags",       DWORD),("ExceptionRecord",      ctypes.POINTER(EXCEPTION_RECORD)),("ExceptionAddress",     PVOID),("NumberParameters",     DWORD),("ExceptionInformation", UINT_PTR * 15),]class EXCEPTION_DEBUG_INFO(ctypes.Structure):_fields_ = [("ExceptionRecord",    EXCEPTION_RECORD),("dwFirstChance",      DWORD),]class DEBUG_EVENT_UNION(ctypes.Union):_fields_ = [("Exception",         EXCEPTION_DEBUG_INFO),
#        ("CreateThread",      CREATE_THREAD_DEBUG_INFO),
#        ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
#        ("ExitThread",        EXIT_THREAD_DEBUG_INFO),
#        ("ExitProcess",       EXIT_PROCESS_DEBUG_INFO),
#        ("LoadDll",           LOAD_DLL_DEBUG_INFO),
#        ("UnloadDll",         UNLOAD_DLL_DEBUG_INFO),
#        ("DebugString",       OUTPUT_DEBUG_STRING_INFO),
#        ("RipInfo",           RIP_INFO),] class DEBUG_EVENT(ctypes.Structure):_fields_ = [("dwDebugEventCode", DWORD),("dwProcessId",      DWORD),("dwThreadId",       DWORD),("u",                DEBUG_EVENT_UNION),]

要实现附加,需要:
打开进程,获得进程的所有权限。
让进程等待调试事件发生。
调试事件发生,要处理调试事件(这里还没有处理调试事件)。
处理完调试事件,下一步继续执行。
解除附加。

如下代码,有比较详细的注释

  • my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):self.h_process = Noneself.pid = Noneself.debugger_active = Falsedef load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)# 保存上面创建进程的句柄,以供后续进程访问使用self.h_process = self.open_process(process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())# 打开 pid 进程,获得该进程的所有权限# 返回该进程句柄def open_process(self, pid):return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)def attach(self, pid):self.h_process = self.open_process(pid)# DebugActiveProcess 附加进程if kernel32.DebugActiveProcess(pid):self.debugger_active = Trueself.pid = int(pid)self.run()else:print("[*] Unable to attach to the process.")# 循环调用 WaitForDebugEvent ,等待调试事件发生def run(self):while self.debugger_active == True:self.get_debug_event()def get_debug_event(self):debug_event = my_debugger_defines.DEBUG_EVENT()continue_status = my_debugger_defines.DBG_CONTINUE# INFINITE:直到有调试事件发生if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):input("press a key to continue...")self.debugger_active = False# 调试事件处理完成后,恢复原执行状态# continue_status -> DBG_CONTINUE:下一步继续执行kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status)# DebugActiveProcessStop:去除附加def detach(self):if kernel32.DebugActiveProcessStop(self.pid):print("[*] Finished debugging. Exiting...")return Trueelse:print("There was an error")return False
  • test.py
import my_debuggerdebugger = my_debugger.debugger()pid = input("Enter the PID of the process to attach to:")debugger.attach(int(pid))debugger.detach()

获取寄存器状态信息

这里值得说的是,在define中,CONTEXT应该是64位的版本,如果python是32的,CONTEXT就用老的版本。如果在64位的调式器中使用老版本的话就会得到 context全零的情况。

  • debugger_define.py
import ctypesBYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR  = ctypes.c_ulongDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF# snapshot
TH32CS_SNAPTHREAD   = 0x00000004# Context flags for GetThreadContext()
CONTEXT_FULL                   = 0x00010007
CONTEXT_DEBUG_REGISTERS        = 0x00010010# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE)   ]# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess",    HANDLE),("hThread",     HANDLE),("dwProcessId", DWORD),("dwThreadId",  DWORD),]class EXCEPTION_RECORD(ctypes.Structure):pass
EXCEPTION_RECORD._fields_ = [("ExceptionCode",        DWORD),("ExceptionFlags",       DWORD),("ExceptionRecord",      ctypes.POINTER(EXCEPTION_RECORD)),("ExceptionAddress",     PVOID),("NumberParameters",     DWORD),("ExceptionInformation", UINT_PTR * 15),]class EXCEPTION_DEBUG_INFO(ctypes.Structure):_fields_ = [("ExceptionRecord",    EXCEPTION_RECORD),("dwFirstChance",      DWORD),]class DEBUG_EVENT_UNION(ctypes.Union):_fields_ = [("Exception",         EXCEPTION_DEBUG_INFO),
#        ("CreateThread",      CREATE_THREAD_DEBUG_INFO),
#        ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
#        ("ExitThread",        EXIT_THREAD_DEBUG_INFO),
#        ("ExitProcess",       EXIT_PROCESS_DEBUG_INFO),
#        ("LoadDll",           LOAD_DLL_DEBUG_INFO),
#        ("UnloadDll",         UNLOAD_DLL_DEBUG_INFO),
#        ("DebugString",       OUTPUT_DEBUG_STRING_INFO),
#        ("RipInfo",           RIP_INFO),] class DEBUG_EVENT(ctypes.Structure):_fields_ = [("dwDebugEventCode", DWORD),("dwProcessId",      DWORD),("dwThreadId",       DWORD),("u",                DEBUG_EVENT_UNION),]class THREADENTRY32(ctypes.Structure):_fields_ = [("dwSize",             DWORD),("cntUsage",           DWORD),("th32ThreadID",       DWORD),("th32OwnerProcessID", DWORD),("tpBasePri",          DWORD),("tpDeltaPri",         DWORD),("dwFlags",            DWORD),]class FLOATING_SAVE_AREA(ctypes.Structure):_fields_ = [("ControlWord", DWORD),("StatusWord", DWORD),("TagWord", DWORD),("ErrorOffset", DWORD),("ErrorSelector", DWORD),("DataOffset", DWORD),("DataSelector", DWORD),("RegisterArea", BYTE * 80),("Cr0NpxState", DWORD),
]# class CONTEXT(ctypes.Structure):
#     _fields_ = [#         ("ContextFlags", DWORD),
#         ("Dr0", DWORD),
#         ("Dr1", DWORD),
#         ("Dr2", DWORD),
#         ("Dr3", DWORD),
#         ("Dr6", DWORD),
#         ("Dr7", DWORD),
#         ("FloatSave", FLOATING_SAVE_AREA),
#         ("SegGs", DWORD),
#         ("SegFs", DWORD),
#         ("SegEs", DWORD),
#         ("SegDs", DWORD),
#         ("Edi", DWORD),
#         ("Esi", DWORD),
#         ("Ebx", DWORD),
#         ("Edx", DWORD),
#         ("Ecx", DWORD),
#         ("Eax", DWORD),
#         ("Ebp", DWORD),
#         ("Eip", DWORD),
#         ("SegCs", DWORD),
#         ("EFlags", DWORD),
#         ("Esp", DWORD),
#         ("SegSs", DWORD),
#         ("ExtendedRegisters", BYTE * 512),
# ]DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlongclass M128A(ctypes.Structure):_fields_ = [("Low", ULONGLONG),("High", LONGLONG)]class CONTEXT_UNION(ctypes.Union):_fields_ = [("S", DWORD * 32)]
class CONTEXT(ctypes.Structure):_fields_ = [("P1Home", DWORD64),("P2Home", DWORD64),("P3Home", DWORD64),("P4Home", DWORD64),("P5Home", DWORD64),("P6Home", DWORD64),("ContextFlags", DWORD),("MxCsr", DWORD),("SegCs", WORD),("SegDs", WORD),("SegEs", WORD),("SegFs", WORD),("SegGs", WORD),("SegSs", WORD),("EFlags", DWORD),("Dr0", DWORD),("Dr1", DWORD),("Dr2", DWORD),("Dr3", DWORD),("Dr6", DWORD),("Dr7", DWORD),("Rax", DWORD64),("Rcx", DWORD64),("Rdx", DWORD64),("Rbx", DWORD64),("Rsp", DWORD64),("Rbp", DWORD64),("Rsi", DWORD64),("Rdi", DWORD64),("R8", DWORD64),("R9", DWORD64),("R10", DWORD64),("R11", DWORD64),("R12", DWORD64),("R13", DWORD64),("R14", DWORD64),("R15", DWORD64),("Rip", DWORD64),("DUMMYUNIONNAME",CONTEXT_UNION),("VectorRegister", M128A * 26),("VectorControl", DWORD64),("DebugControl", DWORD64),("LastBranchToRip", DWORD64),("LastBranchFromRip", DWORD64),("LastExceptionToRip", DWORD64),("LastExceptionFromRip", DWORD64)
]

这里有详细的注释

  • my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):self.h_process = Noneself.pid = Noneself.debugger_active = Falsedef load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)# 保存上面创建进程的句柄,以供后续进程访问使用self.h_process = self.open_process(process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())# 打开 pid 进程,获得该进程的所有权限# 返回该进程句柄def open_process(self, pid):return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)def attach(self, pid):self.h_process = self.open_process(pid)# DebugActiveProcess 附加进程if kernel32.DebugActiveProcess(pid):self.debugger_active = Trueself.pid = int(pid)# self.run()else:print("[*] Unable to attach to the process.")# 循环调用 WaitForDebugEvent ,等待调试事件发生def run(self):while self.debugger_active == True:self.get_debug_event()def get_debug_event(self):debug_event = my_debugger_defines.DEBUG_EVENT()continue_status = my_debugger_defines.DBG_CONTINUE# INFINITE:直到有调试事件发生# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):input("press a key to continue...")self.debugger_active = False# 调试事件处理完成后,恢复原执行状态# continue_status -> DBG_CONTINUE:下一步继续执行kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status)# DebugActiveProcessStop:去除附加def detach(self):if kernel32.DebugActiveProcessStop(self.pid):print("[*] Finished debugging. Exiting...")return Trueelse:print("There was an error")return Falsedef open_thread(self, thread_id):h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)if h_thread is not None:return h_threadelse:print("[*] Could not obtain a valid thread handle.")return Falsedef enumerate_threads(self):# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息thread_entry = my_debugger_defines.THREADENTRY32()thread_list = []# CreateToolhelp32Snapshot返回一个快照句柄# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,# 需要一一对比snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)if snapshot is not None:thread_entry.dwSize = ctypes.sizeof(thread_entry)# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))while success:# 对比该线程的拥有者是不是 pidif thread_entry.th32OwnerProcessID == self.pid:# 把该进程的线程放到列表thread_list.append(thread_entry.th32ThreadID)# 遍历后面所有线程success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))kernel32.CloseHandle(snapshot)return thread_listelse:return False# 获取线程上下文contextdef get_thread_context(self, thread_id=None, h_thread=None):# 线程上下文结构体为 CONTEXTcontext = my_debugger_defines.CONTEXT()context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS# 获取线程句柄h_thread = self.open_thread(thread_id)# 或许线程上下文if kernel32.GetThreadContext(h_thread, ctypes.byref(context)):kernel32.CloseHandle(h_thread)return contextelse:return False
  • my_test.py
import my_debugger
import win32con
import win32apidebugger = my_debugger.debugger()pid = input("Enter the PID of the process to attach to:")debugger.attach(int(pid))list = debugger.enumerate_threads()
for thread in list:thread_context = debugger.get_thread_context(thread)print("[*] Dumping registers for thread ID: 0x%08x." % thread)print("[*] RIP 0x%08x" % thread_context.Rip)print("[*] RSP 0x%08x" % thread_context.Rsp)print("[*] RBP 0x%08x" % thread_context.Rbp)print("[*] RAX 0x%08x" % thread_context.Rax)print("[*] RBX 0x%08x" % thread_context.Rbx)print("[*] RCX 0x%08x" % thread_context.Rcx)print("[*] RDX 0x%08x" % thread_context.Rdx)debugger.detach()

实现调试事件

  • my_debugger_define.py
import ctypesBYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR  = ctypes.c_ulongDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF# snapshot
TH32CS_SNAPTHREAD   = 0x00000004# Context flags for GetThreadContext()
CONTEXT_FULL                   = 0x00010007
CONTEXT_DEBUG_REGISTERS        = 0x00010010# Debug event constants
EXCEPTION_DEBUG_EVENT      =    0x1# debug exception codes.
EXCEPTION_ACCESS_VIOLATION     = 0xC0000005
EXCEPTION_BREAKPOINT           = 0x80000003
EXCEPTION_GUARD_PAGE           = 0x80000001
EXCEPTION_SINGLE_STEP          = 0x80000004# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE)   ]# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess",    HANDLE),("hThread",     HANDLE),("dwProcessId", DWORD),("dwThreadId",  DWORD),]class EXCEPTION_RECORD(ctypes.Structure):pass
EXCEPTION_RECORD._fields_ = [("ExceptionCode",        DWORD),("ExceptionFlags",       DWORD),("ExceptionRecord",      ctypes.POINTER(EXCEPTION_RECORD)),("ExceptionAddress",     PVOID),("NumberParameters",     DWORD),("ExceptionInformation", UINT_PTR * 15),]class EXCEPTION_DEBUG_INFO(ctypes.Structure):_fields_ = [("ExceptionRecord",    EXCEPTION_RECORD),("dwFirstChance",      DWORD),]class DEBUG_EVENT_UNION(ctypes.Union):_fields_ = [("Exception",         EXCEPTION_DEBUG_INFO),
#        ("CreateThread",      CREATE_THREAD_DEBUG_INFO),
#        ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
#        ("ExitThread",        EXIT_THREAD_DEBUG_INFO),
#        ("ExitProcess",       EXIT_PROCESS_DEBUG_INFO),
#        ("LoadDll",           LOAD_DLL_DEBUG_INFO),
#        ("UnloadDll",         UNLOAD_DLL_DEBUG_INFO),
#        ("DebugString",       OUTPUT_DEBUG_STRING_INFO),
#        ("RipInfo",           RIP_INFO),] class DEBUG_EVENT(ctypes.Structure):_fields_ = [("dwDebugEventCode", DWORD),("dwProcessId",      DWORD),("dwThreadId",       DWORD),("u",                DEBUG_EVENT_UNION),]class THREADENTRY32(ctypes.Structure):_fields_ = [("dwSize",             DWORD),("cntUsage",           DWORD),("th32ThreadID",       DWORD),("th32OwnerProcessID", DWORD),("tpBasePri",          DWORD),("tpDeltaPri",         DWORD),("dwFlags",            DWORD),]class FLOATING_SAVE_AREA(ctypes.Structure):_fields_ = [("ControlWord", DWORD),("StatusWord", DWORD),("TagWord", DWORD),("ErrorOffset", DWORD),("ErrorSelector", DWORD),("DataOffset", DWORD),("DataSelector", DWORD),("RegisterArea", BYTE * 80),("Cr0NpxState", DWORD),
]# class CONTEXT(ctypes.Structure):
#     _fields_ = [#         ("ContextFlags", DWORD),
#         ("Dr0", DWORD),
#         ("Dr1", DWORD),
#         ("Dr2", DWORD),
#         ("Dr3", DWORD),
#         ("Dr6", DWORD),
#         ("Dr7", DWORD),
#         ("FloatSave", FLOATING_SAVE_AREA),
#         ("SegGs", DWORD),
#         ("SegFs", DWORD),
#         ("SegEs", DWORD),
#         ("SegDs", DWORD),
#         ("Edi", DWORD),
#         ("Esi", DWORD),
#         ("Ebx", DWORD),
#         ("Edx", DWORD),
#         ("Ecx", DWORD),
#         ("Eax", DWORD),
#         ("Ebp", DWORD),
#         ("Eip", DWORD),
#         ("SegCs", DWORD),
#         ("EFlags", DWORD),
#         ("Esp", DWORD),
#         ("SegSs", DWORD),
#         ("ExtendedRegisters", BYTE * 512),
# ]DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlongclass M128A(ctypes.Structure):_fields_ = [("Low", ULONGLONG),("High", LONGLONG)]class CONTEXT_UNION(ctypes.Union):_fields_ = [("S", DWORD * 32)]
class CONTEXT(ctypes.Structure):_fields_ = [("P1Home", DWORD64),("P2Home", DWORD64),("P3Home", DWORD64),("P4Home", DWORD64),("P5Home", DWORD64),("P6Home", DWORD64),("ContextFlags", DWORD),("MxCsr", DWORD),("SegCs", WORD),("SegDs", WORD),("SegEs", WORD),("SegFs", WORD),("SegGs", WORD),("SegSs", WORD),("EFlags", DWORD),("Dr0", DWORD),("Dr1", DWORD),("Dr2", DWORD),("Dr3", DWORD),("Dr6", DWORD),("Dr7", DWORD),("Rax", DWORD64),("Rcx", DWORD64),("Rdx", DWORD64),("Rbx", DWORD64),("Rsp", DWORD64),("Rbp", DWORD64),("Rsi", DWORD64),("Rdi", DWORD64),("R8", DWORD64),("R9", DWORD64),("R10", DWORD64),("R11", DWORD64),("R12", DWORD64),("R13", DWORD64),("R14", DWORD64),("R15", DWORD64),("Rip", DWORD64),("DUMMYUNIONNAME",CONTEXT_UNION),("VectorRegister", M128A * 26),("VectorControl", DWORD64),("DebugControl", DWORD64),("LastBranchToRip", DWORD64),("LastBranchFromRip", DWORD64),("LastExceptionToRip", DWORD64),("LastExceptionFromRip", DWORD64)
]
  • my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):self.h_process = Noneself.pid = Noneself.debugger_active = False# 实现调试事件处理功能self.h_thread = Noneself.context = Noneself.exception = Noneself.exception_address = Nonedef load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)# 保存上面创建进程的句柄,以供后续进程访问使用self.h_process = self.open_process(process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())# 打开 pid 进程,获得该进程的所有权限# 返回该进程句柄def open_process(self, pid):return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)def attach(self, pid):self.h_process = self.open_process(pid)# DebugActiveProcess 附加进程if kernel32.DebugActiveProcess(pid):self.debugger_active = Trueself.pid = int(pid)# self.run()else:print("[*] Unable to attach to the process.")# 循环调用 WaitForDebugEvent ,等待调试事件发生def run(self):while self.debugger_active == True:self.get_debug_event()def get_debug_event(self):debug_event = my_debugger_defines.DEBUG_EVENT()continue_status = my_debugger_defines.DBG_CONTINUE# INFINITE:直到有调试事件发生# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):# 处理指定异常事件# 获取线程句柄self.h_thread = self.open_thread(debug_event.dwThreadId)self.context = self.get_thread_context(h_thread=self.h_thread)# 打印调试事件类型和线程IDprint("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:# 获取异常代码exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode# 异常地址self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress# 内存违法访问if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:print("Acess Violation Detected.")# 遇到一个断点elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:continue_status = self.exception_handler_breakpoint()# 内存断点elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:print("Guard Page Access Detected.")# 硬件断点elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:print("Single Stepping.")# input("press a key to continue...")# self.debugger_active = False# 调试事件处理完成后,恢复原执行状态# continue_status -> DBG_CONTINUE:下一步继续执行kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status)def exception_handler_breakpoint(self):print("[*] Inside the breakpoint handler.")print("Exception address: 0x%08x" % self.exception_address)return my_debugger_defines.DBG_CONTINUE# DebugActiveProcessStop:去除附加def detach(self):if kernel32.DebugActiveProcessStop(self.pid):print("[*] Finished debugging. Exiting...")return Trueelse:print("There was an error")return False# 通过线程ID打开线程,获取线程句柄def open_thread(self, thread_id):h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)if h_thread is not None:return h_threadelse:print("[*] Could not obtain a valid thread handle.")return False# 获取属于pid进程的线程的IDdef enumerate_threads(self):# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息thread_entry = my_debugger_defines.THREADENTRY32()thread_list = []# CreateToolhelp32Snapshot返回一个快照句柄# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,# 需要一一对比snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)if snapshot is not None:thread_entry.dwSize = ctypes.sizeof(thread_entry)# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))while success:# 对比该线程的拥有者是不是 pidif thread_entry.th32OwnerProcessID == self.pid:# 把该进程的线程ID放到列表thread_list.append(thread_entry.th32ThreadID)# 遍历后面所有线程success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))kernel32.CloseHandle(snapshot)return thread_listelse:return False# 通过线程ID,获取线程上下文contextdef get_thread_context(self, thread_id=None, h_thread=None):# 线程上下文结构体为 CONTEXTcontext = my_debugger_defines.CONTEXT()context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS# 通过线程ID,获取线程句柄if h_thread is None:self.h_thread = self.open_thread(thread_id)# 通过线程句柄,线程上下文if kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):kernel32.CloseHandle(h_thread)return contextelse:return False
  • my_test.py
import my_debuggerdebugger = my_debugger.debugger()pid = input("Enter the PID of the process to attach to:")debugger.attach(int(pid))debugger.run()debugger.detach()

要学习的内容:

DEBUG_EVENT 结构中的联合体和 dwDebugEventCode 的关系:
在这里插入图片描述get_debug_event 函数中实现了当 dwDebugEventCode 等于 0x1的情况。

值得说的是:在DEBUG_EVENT 联合体中 Exception.ExceptionRecord.ExceptionCode又很多取值,有一个是EXCEPTION_GUARD_PAGE,但这个取值在 微软学习手册里没有找到,在VS中的minwinbase.h头文件中有定义

#define EXCEPTION_GUARD_PAGE STATUS_GUARD_PAGE_VIOLATION

软断点(INT3)

这个节坑太多,原文中显示源代码和运行的都不一样。需要看源代码文件去慢慢领悟。
还有就是迁移到python3有很多需要注意的地方,例如语法不同、ctypes调用API的方法不同

  • my_debugger_define.py
import ctypesBYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_wchar)
LPCSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR  = ctypes.c_ulongDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF# snapshot
TH32CS_SNAPTHREAD   = 0x00000004# Context flags for GetThreadContext()
CONTEXT_FULL                   = 0x00010007
CONTEXT_DEBUG_REGISTERS        = 0x00010010# Debug event constants
EXCEPTION_DEBUG_EVENT      =    0x1# debug exception codes.
EXCEPTION_ACCESS_VIOLATION     = 0xC0000005
EXCEPTION_BREAKPOINT           = 0x80000003
EXCEPTION_GUARD_PAGE           = 0x80000001
EXCEPTION_SINGLE_STEP          = 0x80000004# Memory page permissions, used by VirtualProtect()
PAGE_EXECUTE_READWRITE         = 0x00000040# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE)   ]# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess",    HANDLE),("hThread",     HANDLE),("dwProcessId", DWORD),("dwThreadId",  DWORD),]class EXCEPTION_RECORD(ctypes.Structure):pass
EXCEPTION_RECORD._fields_ = [("ExceptionCode",        DWORD),("ExceptionFlags",       DWORD),("ExceptionRecord",      ctypes.POINTER(EXCEPTION_RECORD)),("ExceptionAddress",     PVOID),("NumberParameters",     DWORD),("ExceptionInformation", UINT_PTR * 15),]class EXCEPTION_DEBUG_INFO(ctypes.Structure):_fields_ = [("ExceptionRecord",    EXCEPTION_RECORD),("dwFirstChance",      DWORD),]class DEBUG_EVENT_UNION(ctypes.Union):_fields_ = [("Exception",         EXCEPTION_DEBUG_INFO),
#        ("CreateThread",      CREATE_THREAD_DEBUG_INFO),
#        ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
#        ("ExitThread",        EXIT_THREAD_DEBUG_INFO),
#        ("ExitProcess",       EXIT_PROCESS_DEBUG_INFO),
#        ("LoadDll",           LOAD_DLL_DEBUG_INFO),
#        ("UnloadDll",         UNLOAD_DLL_DEBUG_INFO),
#        ("DebugString",       OUTPUT_DEBUG_STRING_INFO),
#        ("RipInfo",           RIP_INFO),] class DEBUG_EVENT(ctypes.Structure):_fields_ = [("dwDebugEventCode", DWORD),("dwProcessId",      DWORD),("dwThreadId",       DWORD),("u",                DEBUG_EVENT_UNION),]class THREADENTRY32(ctypes.Structure):_fields_ = [("dwSize",             DWORD),("cntUsage",           DWORD),("th32ThreadID",       DWORD),("th32OwnerProcessID", DWORD),("tpBasePri",          DWORD),("tpDeltaPri",         DWORD),("dwFlags",            DWORD),]class FLOATING_SAVE_AREA(ctypes.Structure):_fields_ = [("ControlWord", DWORD),("StatusWord", DWORD),("TagWord", DWORD),("ErrorOffset", DWORD),("ErrorSelector", DWORD),("DataOffset", DWORD),("DataSelector", DWORD),("RegisterArea", BYTE * 80),("Cr0NpxState", DWORD),
]# class CONTEXT(ctypes.Structure):
#     _fields_ = [#         ("ContextFlags", DWORD),
#         ("Dr0", DWORD),
#         ("Dr1", DWORD),
#         ("Dr2", DWORD),
#         ("Dr3", DWORD),
#         ("Dr6", DWORD),
#         ("Dr7", DWORD),
#         ("FloatSave", FLOATING_SAVE_AREA),
#         ("SegGs", DWORD),
#         ("SegFs", DWORD),
#         ("SegEs", DWORD),
#         ("SegDs", DWORD),
#         ("Edi", DWORD),
#         ("Esi", DWORD),
#         ("Ebx", DWORD),
#         ("Edx", DWORD),
#         ("Ecx", DWORD),
#         ("Eax", DWORD),
#         ("Ebp", DWORD),
#         ("Eip", DWORD),
#         ("SegCs", DWORD),
#         ("EFlags", DWORD),
#         ("Esp", DWORD),
#         ("SegSs", DWORD),
#         ("ExtendedRegisters", BYTE * 512),
# ]DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlongclass M128A(ctypes.Structure):_fields_ = [("Low", ULONGLONG),("High", LONGLONG)]class CONTEXT_UNION(ctypes.Union):_fields_ = [("S", DWORD * 32)]
class CONTEXT(ctypes.Structure):_fields_ = [("P1Home", DWORD64),("P2Home", DWORD64),("P3Home", DWORD64),("P4Home", DWORD64),("P5Home", DWORD64),("P6Home", DWORD64),("ContextFlags", DWORD),("MxCsr", DWORD),("SegCs", WORD),("SegDs", WORD),("SegEs", WORD),("SegFs", WORD),("SegGs", WORD),("SegSs", WORD),("EFlags", DWORD),("Dr0", DWORD),("Dr1", DWORD),("Dr2", DWORD),("Dr3", DWORD),("Dr6", DWORD),("Dr7", DWORD),("Rax", DWORD64),("Rcx", DWORD64),("Rdx", DWORD64),("Rbx", DWORD64),("Rsp", DWORD64),("Rbp", DWORD64),("Rsi", DWORD64),("Rdi", DWORD64),("R8", DWORD64),("R9", DWORD64),("R10", DWORD64),("R11", DWORD64),("R12", DWORD64),("R13", DWORD64),("R14", DWORD64),("R15", DWORD64),("Rip", DWORD64),("DUMMYUNIONNAME",CONTEXT_UNION),("VectorRegister", M128A * 26),("VectorControl", DWORD64),("DebugControl", DWORD64),("LastBranchToRip", DWORD64),("LastBranchFromRip", DWORD64),("LastExceptionToRip", DWORD64),("LastExceptionFromRip", DWORD64)
]LPCONTEXT = ctypes.POINTER(CONTEXT)
  • my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):self.h_process = Noneself.pid = Noneself.debugger_active = False# 实现调试事件处理功能self.h_thread = Noneself.context = Noneself.exception = Noneself.exception_address = None# 软断点self.breakpoints = {}self.first_breakpoint = Truedef load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)# 保存上面创建进程的句柄,以供后续进程访问使用self.h_process = self.open_process(process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())# 打开 pid 进程,获得该进程的所有权限# 返回该进程句柄def open_process(self, pid):return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)def attach(self, pid):self.h_process = self.open_process(pid)# DebugActiveProcess 附加进程if kernel32.DebugActiveProcess(pid):self.debugger_active = Trueself.pid = int(pid)# self.run()else:print("[*] Unable to attach to the process.")# 循环调用 WaitForDebugEvent ,等待调试事件发生def run(self):while self.debugger_active == True:self.get_debug_event()def get_debug_event(self):debug_event = my_debugger_defines.DEBUG_EVENT()continue_status = my_debugger_defines.DBG_CONTINUE# INFINITE:直到有调试事件发生# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):# 处理指定异常事件# 获取线程句柄self.h_thread = self.open_thread(debug_event.dwThreadId)self.context = self.get_thread_context(h_thread=self.h_thread)# 打印调试事件类型和线程IDprint("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))# Event Code = 0x1if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:# 获取异常代码exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode# 异常地址self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress# 内存违法访问if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:print("Acess Violation Detected.")# 遇到一个断点elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:continue_status = self.exception_handler_breakpoint()# 内存断点elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:print("Guard Page Access Detected.")# 硬件断点elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:print("Single Stepping.")# input("press a key to continue...")# self.debugger_active = False# 调试事件处理完成后,恢复原执行状态# continue_status -> DBG_CONTINUE:下一步继续执行kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status)def exception_handler_breakpoint(self):print("[*] Exception address: 0x%08x" % self.exception_address)# 检查断点是否为我们设置的断点if not (self.exception_address in self.breakpoints):           # 如果它是第一个Windows驱动的断点# 就继续执行if self.first_breakpoint == True:self.first_breakpoint = Falseprint("[*] Hit the first breakpoint.")return my_debugger_defines.DBG_CONTINUEelse:print("[*] Hit user defined breakpoint.")# 处理设置的断点# 先还原原始数据(没设断点之前)self.write_process_memory(self.exception_address, self.breakpoints[self.exception_address])# 获取新的context# 将EIP重置回原始字节处            self.context = self.get_thread_context(h_thread=self.h_thread)if self.context:self.context.Rip -= 1# 用新的RIP值设置线程的上下文记录kernel32.SetThreadContext(self.h_thread,ctypes.byref(self.context))self.debugger_active = Trueelse:self.debugger_active = Falsecontinue_status = my_debugger_defines.DBG_CONTINUEreturn continue_status# DebugActiveProcessStop:去除附加def detach(self):if kernel32.DebugActiveProcessStop(self.pid):print("[*] Finished debugging. Exiting...")return Trueelse:print("There was an error")return False# 通过线程ID打开线程,获取线程句柄def open_thread(self, thread_id):h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)if h_thread is not None:return h_threadelse:print("[*] Could not obtain a valid thread handle.")return False# 获取属于pid进程的线程的IDdef enumerate_threads(self):# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息thread_entry = my_debugger_defines.THREADENTRY32()thread_list = []# CreateToolhelp32Snapshot返回一个快照句柄# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,# 需要一一对比snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)if snapshot is not None:thread_entry.dwSize = ctypes.sizeof(thread_entry)# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))while success:# 对比该线程的拥有者是不是 pidif thread_entry.th32OwnerProcessID == self.pid:# 把该进程的线程ID放到列表thread_list.append(thread_entry.th32ThreadID)# 遍历后面所有线程success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))kernel32.CloseHandle(snapshot)return thread_listelse:return False# 通过线程ID,获取线程上下文contextdef get_thread_context(self, thread_id=None, h_thread=None):# 线程上下文结构体为 CONTEXTcontext = my_debugger_defines.CONTEXT()context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS# 通过线程ID,获取线程句柄if h_thread is None:self.h_thread = self.open_thread(thread_id)# 通过线程句柄,线程上下文kernel32.GetThreadContext.argtypes = [ctypes.c_void_p, my_debugger_defines.LPCONTEXT]kernel32.GetThreadContext.restype = ctypes.c_boolif kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):kernel32.CloseHandle(h_thread)return contextelse:# return my_debugger_defines.CONTEXT()return False# 读取address处的内容length个字节# 返回读取到的内容def read_process_memory(self, address, length):data = b""read_buf = ctypes.create_string_buffer(length)count = ctypes.c_ulong(0)kernel32.ReadProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.ReadProcessMemory.restype = ctypes.c_boolif not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, ctypes.byref(count)):return Falseelse:data += read_buf.rawreturn data# 把data数据写入到address处def write_process_memory(self, address, data):count = ctypes.c_ulong(0)length = len(data)c_data = ctypes.c_char_p(data[count.value:])kernel32.WriteProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.WriteProcessMemory.restype = ctypes.c_boolif not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, ctypes.byref(count)):return Falseelse:return Truedef bp_set(self, address):print("[*] Setting breakpoint at: 0x%08x" % address)if not (address in self.breakpoints):# 设置断点前,先保存之前的数据old_protect = ctypes.c_ulong(0)kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, my_debugger_defines.DWORD, ctypes.POINTER(ctypes.c_ulong)]kernel32.VirtualProtectEx.restype = ctypes.c_boolkernel32.VirtualProtectEx(self.h_process, address, 1, my_debugger_defines.PAGE_EXECUTE_READWRITE, ctypes.byref(old_protect))# 读取原始1个字节original_byte = self.read_process_memory(address, 1)if original_byte != False:# 写入 INT3 opcodeif self.write_process_memory(address, b"\xCC"):# 在字典中注册断点self.breakpoints[address] = original_bytereturn Trueelse:return False# 解析模块中函数的地址def func_resolve(self, dll, function):# HMODULE GetModuleHandleW(#     [in, optional] LPCWSTR lpModuleName# );kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]kernel32.GetModuleHandleW.restype = ctypes.c_void_p# FARPROC GetProcAddress(#     [in] HMODULE hModule,#     [in] LPCSTR  lpProcName# );kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]kernel32.GetProcAddress.restype = ctypes.c_void_phandle = kernel32.GetModuleHandleW(dll)address = kernel32.GetProcAddress(handle, function)kernel32.CloseHandle.argtypes = [ctypes.c_void_p]kernel32.CloseHandle.restype = ctypes.c_boolkernel32.CloseHandle(handle)return address
  • my_test.py
import my_debuggerdebugger = my_debugger.debugger()pid = input("Enter the PID of the process to attach to:")debugger.attach(int(pid))printf_address = debugger.func_resolve("msvcrt.dll", b"printf")print("[*] Address of printf: 0x%08x" % printf_address)debugger.bp_set(printf_address)debugger.run()debugger.detach()
  • printf_loop.py

这个是被调试程序

import ctypes
import timemsvcrt = ctypes.cdll.msvcrtcounter = 0while True:msvcrt.printf(b"Loop iteration %d!\n", counter)time.sleep(2)counter += 1

运行结果正好符合预期

Event Code: 6 Thread ID: 18744
Event Code: 6 Thread ID: 18744
Event Code: 6 Thread ID: 18744
Event Code: 2 Thread ID: 18880
Event Code: 1 Thread ID: 18880
[*] Exception address: 0x7ffe68e10b10
[*] Hit the first breakpoint.
Event Code: 4 Thread ID: 18880
Event Code: 1 Thread ID: 18744
[*] Exception address: 0x7ffe68b88b50
[*] Hit user defined breakpoint.
[*] Finished debugging. Exiting...

硬件断点

这里define修改了CONTEXT结构体,之前没有影响,在硬件断点出现故障了

  • my_debugger_define.py
import ctypesBYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_wchar)
LPCSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR  = ctypes.c_ulongDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFFDBG_CONTINUE = 0x00010002
DBG_EXCEPTION_NOT_HANDLED = 0x80010001INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF# snapshot
TH32CS_SNAPTHREAD   = 0x00000004# Context flags for GetThreadContext()
CONTEXT_FULL                   = 0x00010007
CONTEXT_DEBUG_REGISTERS        = 0x00010010# Debug event constants
EXCEPTION_DEBUG_EVENT      =    0x1# debug exception codes.
EXCEPTION_ACCESS_VIOLATION     = 0xC0000005
EXCEPTION_BREAKPOINT           = 0x80000003
EXCEPTION_GUARD_PAGE           = 0x80000001
EXCEPTION_SINGLE_STEP          = 0x80000004# Memory page permissions, used by VirtualProtect()
PAGE_EXECUTE_READWRITE         = 0x00000040# Hardware breakpoint conditions
HW_ACCESS                      = 0x00000003
HW_EXECUTE                     = 0x00000000
HW_WRITE                       = 0x00000001# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE)   ]# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess",    HANDLE),("hThread",     HANDLE),("dwProcessId", DWORD),("dwThreadId",  DWORD),]class EXCEPTION_RECORD(ctypes.Structure):pass
EXCEPTION_RECORD._fields_ = [("ExceptionCode",        DWORD),("ExceptionFlags",       DWORD),("ExceptionRecord",      ctypes.POINTER(EXCEPTION_RECORD)),("ExceptionAddress",     PVOID),("NumberParameters",     DWORD),("ExceptionInformation", UINT_PTR * 15),]class EXCEPTION_DEBUG_INFO(ctypes.Structure):_fields_ = [("ExceptionRecord",    EXCEPTION_RECORD),("dwFirstChance",      DWORD),]class DEBUG_EVENT_UNION(ctypes.Union):_fields_ = [("Exception",         EXCEPTION_DEBUG_INFO),
#        ("CreateThread",      CREATE_THREAD_DEBUG_INFO),
#        ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
#        ("ExitThread",        EXIT_THREAD_DEBUG_INFO),
#        ("ExitProcess",       EXIT_PROCESS_DEBUG_INFO),
#        ("LoadDll",           LOAD_DLL_DEBUG_INFO),
#        ("UnloadDll",         UNLOAD_DLL_DEBUG_INFO),
#        ("DebugString",       OUTPUT_DEBUG_STRING_INFO),
#        ("RipInfo",           RIP_INFO),] class DEBUG_EVENT(ctypes.Structure):_fields_ = [("dwDebugEventCode", DWORD),("dwProcessId",      DWORD),("dwThreadId",       DWORD),("u",                DEBUG_EVENT_UNION),]class THREADENTRY32(ctypes.Structure):_fields_ = [("dwSize",             DWORD),("cntUsage",           DWORD),("th32ThreadID",       DWORD),("th32OwnerProcessID", DWORD),("tpBasePri",          DWORD),("tpDeltaPri",         DWORD),("dwFlags",            DWORD),]class FLOATING_SAVE_AREA(ctypes.Structure):_fields_ = [("ControlWord", DWORD),("StatusWord", DWORD),("TagWord", DWORD),("ErrorOffset", DWORD),("ErrorSelector", DWORD),("DataOffset", DWORD),("DataSelector", DWORD),("RegisterArea", BYTE * 80),("Cr0NpxState", DWORD),
]# class CONTEXT(ctypes.Structure):
#     _fields_ = [#         ("ContextFlags", DWORD),
#         ("Dr0", DWORD),
#         ("Dr1", DWORD),
#         ("Dr2", DWORD),
#         ("Dr3", DWORD),
#         ("Dr6", DWORD),
#         ("Dr7", DWORD),
#         ("FloatSave", FLOATING_SAVE_AREA),
#         ("SegGs", DWORD),
#         ("SegFs", DWORD),
#         ("SegEs", DWORD),
#         ("SegDs", DWORD),
#         ("Edi", DWORD),
#         ("Esi", DWORD),
#         ("Ebx", DWORD),
#         ("Edx", DWORD),
#         ("Ecx", DWORD),
#         ("Eax", DWORD),
#         ("Ebp", DWORD),
#         ("Eip", DWORD),
#         ("SegCs", DWORD),
#         ("EFlags", DWORD),
#         ("Esp", DWORD),
#         ("SegSs", DWORD),
#         ("ExtendedRegisters", BYTE * 512),
# ]DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlongclass M128A(ctypes.Structure):_fields_ = [("Low", ULONGLONG),("High", LONGLONG)]class CONTEXT_UNION(ctypes.Union):_fields_ = [("S", DWORD * 32)]
class CONTEXT(ctypes.Structure):_fields_ = [("P1Home", DWORD64),("P2Home", DWORD64),("P3Home", DWORD64),("P4Home", DWORD64),("P5Home", DWORD64),("P6Home", DWORD64),("ContextFlags", DWORD),("MxCsr", DWORD),("SegCs", WORD),("SegDs", WORD),("SegEs", WORD),("SegFs", WORD),("SegGs", WORD),("SegSs", WORD),("EFlags", DWORD),("Dr0", DWORD64),("Dr1", DWORD64),("Dr2", DWORD64),("Dr3", DWORD64),("Dr6", DWORD64),("Dr7", DWORD64),("Rax", DWORD64),("Rcx", DWORD64),("Rdx", DWORD64),("Rbx", DWORD64),("Rsp", DWORD64),("Rbp", DWORD64),("Rsi", DWORD64),("Rdi", DWORD64),("R8", DWORD64),("R9", DWORD64),("R10", DWORD64),("R11", DWORD64),("R12", DWORD64),("R13", DWORD64),("R14", DWORD64),("R15", DWORD64),("Rip", DWORD64),("DUMMYUNIONNAME",CONTEXT_UNION),("VectorRegister", M128A * 26),("VectorControl", DWORD64),("DebugControl", DWORD64),("LastBranchToRip", DWORD64),("LastBranchFromRip", DWORD64),("LastExceptionToRip", DWORD64),("LastExceptionFromRip", DWORD64)
]LPCONTEXT = ctypes.POINTER(CONTEXT)
  • my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):self.h_process = Noneself.pid = Noneself.debugger_active = False# 实现调试事件处理功能self.h_thread = Noneself.context = Noneself.exception = Noneself.exception_address = None# 软断点self.breakpoints = {}self.first_breakpoint = True# 硬中断self.hardware_breakpoints = {}def load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)# 保存上面创建进程的句柄,以供后续进程访问使用self.h_process = self.open_process(process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())# 打开 pid 进程,获得该进程的所有权限# 返回该进程句柄def open_process(self, pid):return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)def attach(self, pid):self.h_process = self.open_process(pid)# DebugActiveProcess 附加进程if kernel32.DebugActiveProcess(pid):self.debugger_active = Trueself.pid = int(pid)# self.run()else:print("[*] Unable to attach to the process.")# 循环调用 WaitForDebugEvent ,等待调试事件发生def run(self):while self.debugger_active == True:self.get_debug_event()def get_debug_event(self):debug_event = my_debugger_defines.DEBUG_EVENT()continue_status = my_debugger_defines.DBG_CONTINUE# INFINITE:直到有调试事件发生# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):# 处理指定异常事件# 获取线程句柄self.h_thread = self.open_thread(debug_event.dwThreadId)self.context = self.get_thread_context(h_thread=self.h_thread)# 打印调试事件类型和线程IDprint("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))# Event Code = 0x1if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:# 获取异常代码exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode# print("0x%08x" % exception)# 异常地址self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress# 内存违法访问if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:print("Acess Violation Detected.")# 遇到一个断点elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:continue_status = self.exception_handler_breakpoint()# 内存断点elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:print("Guard Page Access Detected.")# 硬件断点elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:print("Single Stepping.")self.exception_handler_single_step()# input("press a key to continue...")# self.debugger_active = False# 调试事件处理完成后,恢复原执行状态# continue_status -> DBG_CONTINUE:下一步继续执行kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status)def exception_handler_breakpoint(self):print("[*] Exception address: 0x%08x" % self.exception_address)# 检查断点是否为我们设置的断点if not (self.exception_address in self.breakpoints):           # 如果它是第一个Windows驱动的断点# 就继续执行if self.first_breakpoint == True:self.first_breakpoint = Falseprint("[*] Hit the first breakpoint.")return my_debugger_defines.DBG_CONTINUEelse:print("[*] Hit user defined breakpoint.")# 处理设置的断点# 先还原原始数据(没设断点之前)self.write_process_memory(self.exception_address, self.breakpoints[self.exception_address])# 获取新的context# 将EIP重置回原始字节处            self.context = self.get_thread_context(h_thread=self.h_thread)if self.context:self.context.Rip -= 1# 用新的RIP值设置线程的上下文记录kernel32.SetThreadContext(self.h_thread,ctypes.byref(self.context))self.debugger_active = Trueelse:self.debugger_active = Falsecontinue_status = my_debugger_defines.DBG_CONTINUEreturn continue_statusdef exception_handler_single_step(self):if self.context.Dr6 & 0x1 and (0 in self.hardware_breakpoints):slot = 0elif self.context.Dr6 & 0x2 and (1 in self.hardware_breakpoints):slot = 1elif self.context.Dr6 & 0x4 and (2 in self.hardware_breakpoints):slot = 2elif self.context.Dr6 & 0x8 and (3 in self.hardware_breakpoints):slot = 3else:continue_status = my_debugger_defines.DBG_EXCEPTION_NOT_HANDLED# 从断点字典中删除断点if self.bp_del_hw(slot):continue_status = my_debugger_defines.DBG_CONTINUEprint("[*] Hardware breakpoint removed.")return continue_statusdef bp_del_hw(self, slot):for thread_id in self.enumerate_threads():context = self.get_thread_context(thread_id=thread_id)# 给所有线程删除断点context.Dr7 &= ~(1 << (slot * 2))# 将断点地址清零if   slot == 0: context.Dr0 = 0x0000000000000000elif slot == 1: context.Dr1 = 0x0000000000000000elif slot == 2: context.Dr2 = 0x0000000000000000elif slot == 3: context.Dr3 = 0x0000000000000000# 移除Dr7中的触发断点标志位context.Dr7 &= ~(3 << ((slot * 4) + 16))# 移除断点长度标志位context.Dr7 &= ~(3 << ((slot * 4) + 18))# 提交移除断点后的线程contexth_thread = self.open_thread(thread_id)kernel32.SetThreadContext(h_thread, ctypes.byref(context))# 把该断点从字典中移除del self.hardware_breakpoints[slot]return True# DebugActiveProcessStop:去除附加def detach(self):if kernel32.DebugActiveProcessStop(self.pid):print("[*] Finished debugging. Exiting...")return Trueelse:print("There was an error")return False# 通过线程ID打开线程,获取线程句柄def open_thread(self, thread_id):h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)if h_thread is not None:return h_threadelse:print("[*] Could not obtain a valid thread handle.")return False# 获取属于pid进程的线程的IDdef enumerate_threads(self):# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息thread_entry = my_debugger_defines.THREADENTRY32()thread_list = []# CreateToolhelp32Snapshot返回一个快照句柄# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,# 需要一一对比snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)if snapshot is not None:thread_entry.dwSize = ctypes.sizeof(thread_entry)# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))while success:# 对比该线程的拥有者是不是 pidif thread_entry.th32OwnerProcessID == self.pid:# 把该进程的线程ID放到列表thread_list.append(thread_entry.th32ThreadID)# 遍历后面所有线程success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))kernel32.CloseHandle(snapshot)return thread_listelse:return False# 通过线程ID,获取线程上下文contextdef get_thread_context(self, thread_id=None, h_thread=None):# 线程上下文结构体为 CONTEXTcontext = my_debugger_defines.CONTEXT()context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS# 通过线程ID,获取线程句柄if h_thread is None:self.h_thread = self.open_thread(thread_id)# 通过线程句柄,线程上下文kernel32.GetThreadContext.argtypes = [ctypes.c_void_p, my_debugger_defines.LPCONTEXT]kernel32.GetThreadContext.restype = ctypes.c_boolif kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):kernel32.CloseHandle(h_thread)return contextelse:# return my_debugger_defines.CONTEXT()return False# 读取address处的内容length个字节# 返回读取到的内容def read_process_memory(self, address, length):data = b""read_buf = ctypes.create_string_buffer(length)count = ctypes.c_ulong(0)kernel32.ReadProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.ReadProcessMemory.restype = ctypes.c_boolif not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, ctypes.byref(count)):return Falseelse:data += read_buf.rawreturn data# 把data数据写入到address处def write_process_memory(self, address, data):count = ctypes.c_ulong(0)length = len(data)c_data = ctypes.c_char_p(data[count.value:])kernel32.WriteProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.WriteProcessMemory.restype = ctypes.c_boolif not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, ctypes.byref(count)):return Falseelse:return Truedef bp_set(self, address):print("[*] Setting breakpoint at: 0x%08x" % address)if not (address in self.breakpoints):# 设置断点前,先保存之前的数据old_protect = ctypes.c_ulong(0)kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, my_debugger_defines.DWORD, ctypes.POINTER(ctypes.c_ulong)]kernel32.VirtualProtectEx.restype = ctypes.c_boolkernel32.VirtualProtectEx(self.h_process, address, 1, my_debugger_defines.PAGE_EXECUTE_READWRITE, ctypes.byref(old_protect))# 读取原始1个字节original_byte = self.read_process_memory(address, 1)if original_byte != False:# 写入 INT3 opcodeif self.write_process_memory(address, b"\xCC"):# 在字典中注册断点self.breakpoints[address] = original_bytereturn Trueelse:return False# 硬件断点def bp_set_hw(self, address, length, condition):# 检测硬件断点长度是否有效if length not in (1, 2, 4):return Falseelse:length -= 1# 检测硬件断点的触发调试是否有效if condition not in (my_debugger_defines.HW_ACCESS, my_debugger_defines.HW_EXECUTE, my_debugger_defines.HW_WRITE):return False# 检测是否存在空置的调试器槽if not (0 in self.hardware_breakpoints):available = 0elif not (1 in self.hardware_breakpoints):available = 1elif not (2 in self.hardware_breakpoints):available = 2elif not (3 in self.hardware_breakpoints):available = 3else:return False# 在每个线程环境下设置调试寄存器for thread_id in self.enumerate_threads():context = self.get_thread_context(thread_id=thread_id)# 设置DR7中对应的标志位,激活硬件断点context.Dr7 |= 1 << (available * 2)# 在空闲的DR0 ~ 3寄存器中写入断点地址if available == 0:context.Dr0 = address# print("Dr0 0x%08x" % context.Dr0)elif available == 1:context.Dr1 = address# print("Dr1 0x%08x" % context.Dr1)elif available == 2:context.Dr2 = address# print("Dr2 0x%08x" % context.Dr2)elif available == 3:context.Dr3 = address# print("Dr3 0x%08x" % context.Dr3)# 设置触发条件context.Dr7 |= condition << ((available * 4) + 16)# 设置触发长度context.Dr7 |= length << ((available * 4) + 18)# 提交改动后的contexth_thread = self.open_thread(thread_id)kernel32.SetThreadContext(h_thread, ctypes.byref(context))# 更新内部硬件断点列表self.hardware_breakpoints[available] = (address, length, condition)return True# 解析模块中函数的地址def func_resolve(self, dll, function):# HMODULE GetModuleHandleW(#     [in, optional] LPCWSTR lpModuleName# );kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]kernel32.GetModuleHandleW.restype = ctypes.c_void_p# FARPROC GetProcAddress(#     [in] HMODULE hModule,#     [in] LPCSTR  lpProcName# );kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]kernel32.GetProcAddress.restype = ctypes.c_void_phandle = kernel32.GetModuleHandleW(dll)address = kernel32.GetProcAddress(handle, function)kernel32.CloseHandle.argtypes = [ctypes.c_void_p]kernel32.CloseHandle.restype = ctypes.c_boolkernel32.CloseHandle(handle)return address
  • printf_loop.py
import ctypes
import timemsvcrt = ctypes.cdll.msvcrtcounter = 0while True:msvcrt.printf(b"Loop iteration %d!\n", counter)time.sleep(2)counter += 1
  • my_test.py
import my_debugger
from my_debugger_defines import *debugger = my_debugger.debugger()pid = input("Enter the PID of the process to attach to:")debugger.attach(int(pid))printf_address = debugger.func_resolve("msvcrt.dll", b"printf")print("[*] Address of printf: 0x%08x" % printf_address)debugger.bp_set_hw(printf_address, 1, HW_EXECUTE)debugger.run()debugger.detach()

运行结果(这里显示了一些调试的print)

Event Code: 2 Thread ID: 9928
Event Code: 1 Thread ID: 9928
0x80000003
[*] Exception address: 0x7ffe68e10b10
[*] Hit the first breakpoint.
Event Code: 4 Thread ID: 9928
Event Code: 1 Thread ID: 18456
0x80000004
Single Stepping.
[*] Hardware breakpoint removed.
Event Code: 2 Thread ID: 21584

内存断点

内存断点本质上不是真正的断点。当一个调试器设置一个内存断点时,调试器实质上所做的是改变一个内存区域或–个内存页的访问权限。内存页是操作系统可以一次处理的最小内存块。操作系统每分配一个内存页时,都会为这个内存页设置访问权限,该权限决定了这个内存页被访问的方式。以下是几个不同的内存页访问权限的例子:

  • 页可执行:允许进程执行页上的代码,但是如果进程试图读写这个页将导致非法内存操作异常;
  • 页可读:进程只能从这个内存页中读取数据;任何企图写入数据或者执行代码的操作会导致非法内存操作异常;
  • 页可写:允许进程在这个内存页上写入数据;
  • 保护页:对保护页任何类型的访问将导致一次性异常, 之后这个内存页会恢复到之前的状态。

主要过程

VirtualQueryEx :找内存断点区域的内存页
VirtualProtectEx :修改内存页属性为保护页

  • my_debugger_define.py
import ctypesBYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_wchar)
LPCSTR = ctypes.POINTER(ctypes.c_char)
LPVOID    = ctypes.c_void_p
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR  = ctypes.c_ulong
SIZE_T    = ctypes.c_ulongDEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFFDBG_CONTINUE = 0x00010002
DBG_EXCEPTION_NOT_HANDLED = 0x80010001INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF# snapshot
TH32CS_SNAPTHREAD   = 0x00000004# Context flags for GetThreadContext()
CONTEXT_FULL                   = 0x00010007
CONTEXT_DEBUG_REGISTERS        = 0x00010010# Debug event constants
EXCEPTION_DEBUG_EVENT      =    0x1# debug exception codes.
EXCEPTION_ACCESS_VIOLATION     = 0xC0000005
EXCEPTION_BREAKPOINT           = 0x80000003
EXCEPTION_GUARD_PAGE           = 0x80000001
EXCEPTION_SINGLE_STEP          = 0x80000004# Memory page permissions, used by VirtualProtect()
PAGE_EXECUTE_READWRITE         = 0x00000040# Hardware breakpoint conditions
HW_ACCESS                      = 0x00000003
HW_EXECUTE                     = 0x00000000
HW_WRITE                       = 0x00000001# Memory page permissions, used by VirtualProtect()
PAGE_NOACCESS                  = 0x00000001
PAGE_READONLY                  = 0x00000002
PAGE_READWRITE                 = 0x00000004
PAGE_WRITECOPY                 = 0x00000008
PAGE_EXECUTE                   = 0x00000010
PAGE_EXECUTE_READ              = 0x00000020
PAGE_EXECUTE_READWRITE         = 0x00000040
PAGE_EXECUTE_WRITECOPY         = 0x00000080
PAGE_GUARD                     = 0x00000100
PAGE_NOCACHE                   = 0x00000200
PAGE_WRITECOMBINE              = 0x00000400# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;class STARTUPINFO(ctypes.Structure):_fields_ = [("cb", DWORD),("lpReserved", LPTSTR),("lpDesktop", LPTSTR),("lpTitle", LPTSTR),("dwX", DWORD),("dwY", DWORD),("dwXSize", DWORD),("dwYSize", DWORD),("dwXcountChars", DWORD),("dwYcountChars", DWORD),("dwFillAttribute", DWORD),("dwFlags", DWORD),("wShowWindow", WORD),("cbReserved2", WORD),("lpReserved2", LPBYTE),("hStdIput", HANDLE),("hStdOutput", HANDLE),("hStdError", HANDLE)   ]# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;class PROCESS_INFORMATION(ctypes.Structure):_fields_ = [("hProcess",    HANDLE),("hThread",     HANDLE),("dwProcessId", DWORD),("dwThreadId",  DWORD),]class EXCEPTION_RECORD(ctypes.Structure):pass
EXCEPTION_RECORD._fields_ = [("ExceptionCode",        DWORD),("ExceptionFlags",       DWORD),("ExceptionRecord",      ctypes.POINTER(EXCEPTION_RECORD)),("ExceptionAddress",     PVOID),("NumberParameters",     DWORD),("ExceptionInformation", UINT_PTR * 15),]class EXCEPTION_DEBUG_INFO(ctypes.Structure):_fields_ = [("ExceptionRecord",    EXCEPTION_RECORD),("dwFirstChance",      DWORD),]class DEBUG_EVENT_UNION(ctypes.Union):_fields_ = [("Exception",         EXCEPTION_DEBUG_INFO),
#        ("CreateThread",      CREATE_THREAD_DEBUG_INFO),
#        ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
#        ("ExitThread",        EXIT_THREAD_DEBUG_INFO),
#        ("ExitProcess",       EXIT_PROCESS_DEBUG_INFO),
#        ("LoadDll",           LOAD_DLL_DEBUG_INFO),
#        ("UnloadDll",         UNLOAD_DLL_DEBUG_INFO),
#        ("DebugString",       OUTPUT_DEBUG_STRING_INFO),
#        ("RipInfo",           RIP_INFO),] class DEBUG_EVENT(ctypes.Structure):_fields_ = [("dwDebugEventCode", DWORD),("dwProcessId",      DWORD),("dwThreadId",       DWORD),("u",                DEBUG_EVENT_UNION),]class THREADENTRY32(ctypes.Structure):_fields_ = [("dwSize",             DWORD),("cntUsage",           DWORD),("th32ThreadID",       DWORD),("th32OwnerProcessID", DWORD),("tpBasePri",          DWORD),("tpDeltaPri",         DWORD),("dwFlags",            DWORD),]class FLOATING_SAVE_AREA(ctypes.Structure):_fields_ = [("ControlWord", DWORD),("StatusWord", DWORD),("TagWord", DWORD),("ErrorOffset", DWORD),("ErrorSelector", DWORD),("DataOffset", DWORD),("DataSelector", DWORD),("RegisterArea", BYTE * 80),("Cr0NpxState", DWORD),
]# class CONTEXT(ctypes.Structure):
#     _fields_ = [#         ("ContextFlags", DWORD),
#         ("Dr0", DWORD),
#         ("Dr1", DWORD),
#         ("Dr2", DWORD),
#         ("Dr3", DWORD),
#         ("Dr6", DWORD),
#         ("Dr7", DWORD),
#         ("FloatSave", FLOATING_SAVE_AREA),
#         ("SegGs", DWORD),
#         ("SegFs", DWORD),
#         ("SegEs", DWORD),
#         ("SegDs", DWORD),
#         ("Edi", DWORD),
#         ("Esi", DWORD),
#         ("Ebx", DWORD),
#         ("Edx", DWORD),
#         ("Ecx", DWORD),
#         ("Eax", DWORD),
#         ("Ebp", DWORD),
#         ("Eip", DWORD),
#         ("SegCs", DWORD),
#         ("EFlags", DWORD),
#         ("Esp", DWORD),
#         ("SegSs", DWORD),
#         ("ExtendedRegisters", BYTE * 512),
# ]DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlongclass M128A(ctypes.Structure):_fields_ = [("Low", ULONGLONG),("High", LONGLONG)]# class DUMMYUNIONNAME(Union):
#     _fields_=[
#               ("FltSave", XMM_SAVE_AREA32),
#               ("DummyStruct", DUMMYSTRUCTNAME)
#               ]# class DUMMYSTRUCTNAME(Structure):
#     _fields_=[
#               ("Header", M128A * 2),
#               ("Legacy", M128A * 8),
#               ("Xmm0", M128A),
#               ("Xmm1", M128A),
#               ("Xmm2", M128A),
#               ("Xmm3", M128A),
#               ("Xmm4", M128A),
#               ("Xmm5", M128A),
#               ("Xmm6", M128A),
#               ("Xmm7", M128A),
#               ("Xmm8", M128A),
#               ("Xmm9", M128A),
#               ("Xmm10", M128A),
#               ("Xmm11", M128A),
#               ("Xmm12", M128A),
#               ("Xmm13", M128A),
#               ("Xmm14", M128A),
#               ("Xmm15", M128A)
#               ]# class XMM_SAVE_AREA32(Structure):
#     _pack_ = 1 
#     _fields_ = [  
#                 ('ControlWord', WORD), 
#                 ('StatusWord', WORD), 
#                 ('TagWord', BYTE), 
#                 ('Reserved1', BYTE), 
#                 ('ErrorOpcode', WORD), 
#                 ('ErrorOffset', DWORD), 
#                 ('ErrorSelector', WORD), 
#                 ('Reserved2', WORD), 
#                 ('DataOffset', DWORD), 
#                 ('DataSelector', WORD), 
#                 ('Reserved3', WORD), 
#                 ('MxCsr', DWORD), 
#                 ('MxCsr_Mask', DWORD), 
#                 ('FloatRegisters', M128A * 8), 
#                 ('XmmRegisters', M128A * 16), 
#                 ('Reserved4', BYTE * 96)
#                 ] class CONTEXT_UNION(ctypes.Union):_fields_ = [("S", DWORD * 32)]
class CONTEXT(ctypes.Structure):_fields_ = [("P1Home", DWORD64),("P2Home", DWORD64),("P3Home", DWORD64),("P4Home", DWORD64),("P5Home", DWORD64),("P6Home", DWORD64),("ContextFlags", DWORD),("MxCsr", DWORD),("SegCs", WORD),("SegDs", WORD),("SegEs", WORD),("SegFs", WORD),("SegGs", WORD),("SegSs", WORD),("EFlags", DWORD),("Dr0", DWORD64),("Dr1", DWORD64),("Dr2", DWORD64),("Dr3", DWORD64),("Dr6", DWORD64),("Dr7", DWORD64),("Rax", DWORD64),("Rcx", DWORD64),("Rdx", DWORD64),("Rbx", DWORD64),("Rsp", DWORD64),("Rbp", DWORD64),("Rsi", DWORD64),("Rdi", DWORD64),("R8", DWORD64),("R9", DWORD64),("R10", DWORD64),("R11", DWORD64),("R12", DWORD64),("R13", DWORD64),("R14", DWORD64),("R15", DWORD64),("Rip", DWORD64),("DUMMYUNIONNAME",CONTEXT_UNION),("VectorRegister", M128A * 26),("VectorControl", DWORD64),("DebugControl", DWORD64),("LastBranchToRip", DWORD64),("LastBranchFromRip", DWORD64),("LastExceptionToRip", DWORD64),("LastExceptionFromRip", DWORD64)
]LPCONTEXT = ctypes.POINTER(CONTEXT)class PROC_STRUCT(ctypes.Structure):_fields_ = [("wProcessorArchitecture",    WORD),("wReserved",                 WORD),
]
class SYSTEM_INFO_UNION(ctypes.Union):_fields_ = [("dwOemId",    DWORD),("sProcStruc", PROC_STRUCT),
]
class SYSTEM_INFO(ctypes.Structure):_fields_ = [("uSysInfo", SYSTEM_INFO_UNION),("dwPageSize", DWORD),("lpMinimumApplicationAddress", LPVOID),("lpMaximumApplicationAddress", LPVOID),("dwActiveProcessorMask", DWORD),("dwNumberOfProcessors", DWORD),("dwProcessorType", DWORD),("dwAllocationGranularity", DWORD),("wProcessorLevel", WORD),("wProcessorRevision", WORD),
]# class MEMORY_BASIC_INFORMATION(ctypes.Structure):
#     _fields_ = [
#         ("BaseAddress", PVOID),
#         ("AllocationBase", PVOID),
#         ("AllocationProtect", DWORD),
#         ("PartitionId", WORD),
#         ("RegionSize", SIZE_T),
#         ("State", DWORD),
#         ("Protect", DWORD),
#         ("Type", DWORD),
# ]
class MEMORY_BASIC_INFORMATION (ctypes.Structure):_fields_ = [("BaseAddress", ULONGLONG),("AllocationBase", ULONGLONG),("AllocationProtect", DWORD),("__alignment1", DWORD),("RegionSize", ULONGLONG),("State", DWORD),("Protect", DWORD),("Type", DWORD),("__alignment2", DWORD)]
PMEMORY_BASIC_INFORMATION = ctypes.POINTER(MEMORY_BASIC_INFORMATION)
  • my_debugger.py
import ctypes
import my_debugger_defineskernel32 = ctypes.windll.kernel32class debugger():def __init__(self):self.h_process = Noneself.pid = Noneself.debugger_active = False# 实现调试事件处理功能self.h_thread = Noneself.context = Noneself.exception = Noneself.exception_address = None# 软断点self.breakpoints = {}self.first_breakpoint = True# 硬件断点self.hardware_breakpoints = {}# 内存断点self.guarded_pages = []self.memory_breakpoints = {}# 获取当前系统默认的内存页的大小设定system_info = my_debugger_defines.SYSTEM_INFO()kernel32.GetSystemInfo(ctypes.byref(system_info))self.page_size = system_info.dwPageSizedef load(self, path_to_exe):creation_flags = my_debugger_defines.DEBUG_PROCESSstartupinfo = my_debugger_defines.STARTUPINFO()process_information = my_debugger_defines.PROCESS_INFORMATION()startupinfo.dwFlags = 0x1startupinfo.wShowWindow = 0x0startupinfo.cb = ctypes.sizeof(startupinfo)if kernel32.CreateProcessW(path_to_exe,None,None,None,None,creation_flags,None,None,ctypes.byref(startupinfo),ctypes.byref(process_information)):print("[*] We have successfully lanuched the process!")print("[*] PID: %d" % process_information.dwProcessId)# 保存上面创建进程的句柄,以供后续进程访问使用self.h_process = self.open_process(process_information.dwProcessId)else:print("[*] Error: 0x%08x." % kernel32.GetLastError())# 打开 pid 进程,获得该进程的所有权限# 返回该进程句柄def open_process(self, pid):return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)def attach(self, pid):self.h_process = self.open_process(pid)# DebugActiveProcess 附加进程if kernel32.DebugActiveProcess(pid):self.debugger_active = Trueself.pid = int(pid)# self.run()else:print("[*] Unable to attach to the process.")# 循环调用 WaitForDebugEvent ,等待调试事件发生def run(self):while self.debugger_active == True:self.get_debug_event()def get_debug_event(self):debug_event = my_debugger_defines.DEBUG_EVENT()continue_status = my_debugger_defines.DBG_CONTINUE# INFINITE:直到有调试事件发生# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):# 处理指定异常事件# 获取线程句柄self.h_thread = self.open_thread(debug_event.dwThreadId)self.context = self.get_thread_context(h_thread=self.h_thread)# 打印调试事件类型和线程IDprint("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))# Event Code = 0x1if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:# 获取异常代码exception = debug_event.u.Exception.ExceptionRecord.ExceptionCodeprint("0x%08x" % exception)# 异常地址self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress# 内存违法访问if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:print("Acess Violation Detected.")# 遇到一个断点elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:continue_status = self.exception_handler_breakpoint()# 内存断点elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:print("Guard Page Access Detected.")# 硬件断点elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:print("Single Stepping.")self.exception_handler_single_step()# input("press a key to continue...")# self.debugger_active = False# 调试事件处理完成后,恢复原执行状态# continue_status -> DBG_CONTINUE:下一步继续执行kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status)def exception_handler_breakpoint(self):print("[*] Exception address: 0x%08x" % self.exception_address)# 检查断点是否为我们设置的断点if not (self.exception_address in self.breakpoints):           # 如果它是第一个Windows驱动的断点# 就继续执行if self.first_breakpoint == True:self.first_breakpoint = Falseprint("[*] Hit the first breakpoint.")return my_debugger_defines.DBG_CONTINUEelse:print("[*] Hit user defined breakpoint.")# 处理设置的断点# 先还原原始数据(没设断点之前)self.write_process_memory(self.exception_address, self.breakpoints[self.exception_address])# 获取新的context# 将EIP重置回原始字节处            self.context = self.get_thread_context(h_thread=self.h_thread)if self.context:self.context.Rip -= 1# 用新的RIP值设置线程的上下文记录kernel32.SetThreadContext(self.h_thread,ctypes.byref(self.context))self.debugger_active = Trueelse:self.debugger_active = Falsecontinue_status = my_debugger_defines.DBG_CONTINUEreturn continue_statusdef exception_handler_single_step(self):if self.context.Dr6 & 0x1 and (0 in self.hardware_breakpoints):slot = 0elif self.context.Dr6 & 0x2 and (1 in self.hardware_breakpoints):slot = 1elif self.context.Dr6 & 0x4 and (2 in self.hardware_breakpoints):slot = 2elif self.context.Dr6 & 0x8 and (3 in self.hardware_breakpoints):slot = 3else:continue_status = my_debugger_defines.DBG_EXCEPTION_NOT_HANDLED# 从断点字典中删除断点if self.bp_del_hw(slot):continue_status = my_debugger_defines.DBG_CONTINUEprint("[*] Hardware breakpoint removed.")return continue_statusdef bp_del_hw(self, slot):for thread_id in self.enumerate_threads():context = self.get_thread_context(thread_id=thread_id)# 给所有线程删除断点context.Dr7 &= ~(1 << (slot * 2))# 将断点地址清零if   slot == 0: context.Dr0 = 0x0000000000000000elif slot == 1: context.Dr1 = 0x0000000000000000elif slot == 2: context.Dr2 = 0x0000000000000000elif slot == 3: context.Dr3 = 0x0000000000000000# 移除Dr7中的触发断点标志位context.Dr7 &= ~(3 << ((slot * 4) + 16))# 移除断点长度标志位context.Dr7 &= ~(3 << ((slot * 4) + 18))# 提交移除断点后的线程contexth_thread = self.open_thread(thread_id)kernel32.SetThreadContext(h_thread, ctypes.byref(context))# 把该断点从字典中移除del self.hardware_breakpoints[slot]return True# DebugActiveProcessStop:去除附加def detach(self):if kernel32.DebugActiveProcessStop(self.pid):print("[*] Finished debugging. Exiting...")return Trueelse:print("There was an error")return False# 通过线程ID打开线程,获取线程句柄def open_thread(self, thread_id):h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)if h_thread is not None:return h_threadelse:print("[*] Could not obtain a valid thread handle.")return False# 获取属于pid进程的线程的IDdef enumerate_threads(self):# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息thread_entry = my_debugger_defines.THREADENTRY32()thread_list = []# CreateToolhelp32Snapshot返回一个快照句柄# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,# 需要一一对比snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)if snapshot is not None:thread_entry.dwSize = ctypes.sizeof(thread_entry)# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))while success:# 对比该线程的拥有者是不是 pidif thread_entry.th32OwnerProcessID == self.pid:# 把该进程的线程ID放到列表thread_list.append(thread_entry.th32ThreadID)# 遍历后面所有线程success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))kernel32.CloseHandle(snapshot)return thread_listelse:return False# 通过线程ID,获取线程上下文contextdef get_thread_context(self, thread_id=None, h_thread=None):# 线程上下文结构体为 CONTEXTcontext = my_debugger_defines.CONTEXT()context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS# 通过线程ID,获取线程句柄if h_thread is None:self.h_thread = self.open_thread(thread_id)# 通过线程句柄,线程上下文kernel32.GetThreadContext.argtypes = [ctypes.c_void_p, my_debugger_defines.LPCONTEXT]kernel32.GetThreadContext.restype = ctypes.c_boolif kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):kernel32.CloseHandle(h_thread)return contextelse:# return my_debugger_defines.CONTEXT()return False# 读取address处的内容length个字节# 返回读取到的内容def read_process_memory(self, address, length):data = b""read_buf = ctypes.create_string_buffer(length)count = ctypes.c_ulong(0)kernel32.ReadProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.ReadProcessMemory.restype = ctypes.c_boolif not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, ctypes.byref(count)):return Falseelse:data += read_buf.rawreturn data# 把data数据写入到address处def write_process_memory(self, address, data):count = ctypes.c_ulong(0)length = len(data)c_data = ctypes.c_char_p(data[count.value:])kernel32.WriteProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.WriteProcessMemory.restype = ctypes.c_boolif not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, ctypes.byref(count)):return Falseelse:return Truedef bp_set(self, address):print("[*] Setting breakpoint at: 0x%08x" % address)if not (address in self.breakpoints):# 设置断点前,先保存之前的数据old_protect = ctypes.c_ulong(0)kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, my_debugger_defines.DWORD, ctypes.POINTER(ctypes.c_ulong)]kernel32.VirtualProtectEx.restype = ctypes.c_boolkernel32.VirtualProtectEx(self.h_process, address, 1, my_debugger_defines.PAGE_EXECUTE_READWRITE, ctypes.byref(old_protect))# 读取原始1个字节original_byte = self.read_process_memory(address, 1)if original_byte != False:# 写入 INT3 opcodeif self.write_process_memory(address, b"\xCC"):# 在字典中注册断点self.breakpoints[address] = original_bytereturn Trueelse:return False# 硬件断点def bp_set_hw(self, address, length, condition):# 检测硬件断点长度是否有效if length not in (1, 2, 4):return Falseelse:length -= 1# 检测硬件断点的触发调试是否有效if condition not in (my_debugger_defines.HW_ACCESS, my_debugger_defines.HW_EXECUTE, my_debugger_defines.HW_WRITE):return False# 检测是否存在空置的调试器槽if not (0 in self.hardware_breakpoints):available = 0elif not (1 in self.hardware_breakpoints):available = 1elif not (2 in self.hardware_breakpoints):available = 2elif not (3 in self.hardware_breakpoints):available = 3else:return False# 在每个线程环境下设置调试寄存器for thread_id in self.enumerate_threads():context = self.get_thread_context(thread_id=thread_id)# 设置DR7中对应的标志位,激活硬件断点context.Dr7 |= 1 << (available * 2)# 在空闲的DR0 ~ 3寄存器中写入断点地址if available == 0:context.Dr0 = address# print("Dr0 0x%08x" % context.Dr0)elif available == 1:context.Dr1 = address# print("Dr1 0x%08x" % context.Dr1)elif available == 2:context.Dr2 = address# print("Dr2 0x%08x" % context.Dr2)elif available == 3:context.Dr3 = address# print("Dr3 0x%08x" % context.Dr3)# 设置触发条件context.Dr7 |= condition << ((available * 4) + 16)# 设置触发长度context.Dr7 |= length << ((available * 4) + 18)# 提交改动后的contexth_thread = self.open_thread(thread_id)kernel32.SetThreadContext(h_thread, ctypes.byref(context))# 更新内部硬件断点列表self.hardware_breakpoints[available] = (address, length, condition)return Truedef bp_set_mem(self, address, size):# 调用VirtualQueryEx, 返回一个MEMORY_BASIC_INFORMATION结构体# 从而获取内存页的起始地址(MEMORY_BASIC_INFORMATION.BaseAddress)mbi = my_debugger_defines.MEMORY_BASIC_INFORMATION()kernel32.VirtualQueryEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, my_debugger_defines.PMEMORY_BASIC_INFORMATION, ctypes.c_size_t]kernel32.VirtualQueryEx.restype = ctypes.c_size_t# print(address)length = kernel32.VirtualQueryEx(self.h_process, address, ctypes.byref(mbi), ctypes.sizeof(mbi))print("[*] Error with error code %d." % kernel32.GetLastError())length_mbi = ctypes.sizeof(mbi)if length < length_mbi:return False        current_page = mbi.BaseAddress # 当前页的起始地址# 把内存断点涉及的内存页都设置成保护页while current_page <= address + size:self.guarded_pages.append(current_page)old_protection = ctypes.c_ulong(0)# 设置内存页的访问权限kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]kernel32.VirtualProtectEx.restype = ctypes.c_boolif not kernel32.VirtualProtectEx(self.h_process, current_page, size,mbi.Protect | my_debugger_defines.PAGE_GUARD, ctypes.byref(old_protection)):return Falsecurrent_page += self.page_size# 将该内存断点记录在全局字典中self.memory_breakpoints[address] = (address, size, mbi)return True# 解析模块中函数的地址def func_resolve(self, dll, function):# HMODULE GetModuleHandleW(#     [in, optional] LPCWSTR lpModuleName# );kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]kernel32.GetModuleHandleW.restype = ctypes.c_void_p        handle = kernel32.GetModuleHandleW(dll)# FARPROC GetProcAddress(#     [in] HMODULE hModule,#     [in] LPCSTR  lpProcName# );kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]kernel32.GetProcAddress.restype = ctypes.c_void_paddress = kernel32.GetProcAddress(handle, function)kernel32.CloseHandle.argtypes = [ctypes.c_void_p]kernel32.CloseHandle.restype = ctypes.c_boolkernel32.CloseHandle(handle)return address
  • printf_loop.py
import ctypes
import timemsvcrt = ctypes.cdll.msvcrtcounter = 0while True:msvcrt.printf(b"Loop iteration %d!\n", counter)time.sleep(2)counter += 1
  • my_test.py
import my_debugger
from my_debugger_defines import *debugger = my_debugger.debugger()pid = input("Enter the PID of the process to attach to:")debugger.attach(int(pid))printf_address = debugger.func_resolve("msvcrt.dll", b"printf")print("[*] Address of printf: 0x%08x" % printf_address)print(debugger.bp_set_mem(printf_address, 10))debugger.run()debugger.detach()

完结完结!!!


http://www.ppmy.cn/news/978026.html

相关文章

linux 系统编程

文件IO 从本章开始学习各种Linux系统函数,这些函数的用法必须结合Linux内核的工作原理来理解, 因为系统函数正是内核提供给应用程序的接口, 而要理解内核的工作原理,必须熟练掌握C语言, 因为内核也是用C语言写的, 我们在描述内核工作原理时必然要用“指针”、“结构体”、“链表…

企业面临的数据泄露途径有哪些?该如何防范?

随着数字经济蓬勃发展&#xff0c;数据对于企业的价值与重要性不断攀升&#xff0c;随之而来的数据安全风险也不断涌现。近年来&#xff0c;数据泄露事件时有发生&#xff0c;对企业财产安全、声誉等构成极大威胁。 常见的企业数据泄露途径有哪些&#xff1f; ● 内部员工泄露…

【920信号与系统笔记】第四章 连续时间系统的频域分析

第四章 连续时间系统的频域分析 4.1引言4.2信号通过系统的频域分析方法频域系统函数H(jw)系统在周期性信号激励下的频域分析系统在非周期信号激励下的频域分析周期信号和非周期信号分析方法比较 4.1引言 频域分析法 1.步骤 1.时域求解响应的问题通过傅里叶级数或者傅里叶变换转…

express 路由匹配和数据获取

express配置路由只需要通过app.method(url,func)来配置&#xff0c;其中url配置和其中的参数获取方法不同 直接写全路径 路由中允许存在. get请求传入的参数 router.get("/home", (req, res) > {res.status(200).send(req.query); });通过/home?a1会收到对象…

AI面试官:Asp.Net 中使用Log4Net (一)

AI面试官&#xff1a;Asp.Net 中使用Log4Net (一) 当面试涉及到使用log4net日志记录框架的相关问题时&#xff0c;通常会聚焦在如何在.NET或.NET Core应用程序中集成和使用log4net。以下是一些关于log4net的面试题目&#xff0c;以及相应的解答、案例和代码&#xff1a; 文章目…

SOC FPGA介绍及开发设计流程

目录 一、SoC FPGA简介 二、SoC FPGA开发流程 2.1 硬件开发 2.2 软件开发 一、SoC FPGA简介 SOC FPGA是在FPGA架构中集成了基于ARM的硬核处理器系统(HPS)&#xff0c;包括处理器、外设和存储器控制器。相较于传统的仅有ARM处理器或 FPGA 的嵌入式芯片&#xff0c;SOC FPGA既…

python_pyqtgraph常用语法记录

目录 全局设置 setConfigOptions() PyQtGraph的辅助函数 数据显示函数 Simple Data Display Functions 颜色、画笔和笔刷 Color,Pen,and Brush Functions Data Slicing 暂用不上 Coordinate Transformation 暂用不上 SI Unit Conversion Functions 暂用不上 Image Prepa…

苹果iOS 16.6 RC发布:或为iPhone X/8系列养老版本

今天苹果向iPhone用户推送了iOS 16.6 RC更新(内部版本号&#xff1a;20G75)&#xff0c;这是时隔两个月的首次更新。 按照惯例RC版基本不会有什么问题&#xff0c;会在最近一段时间内直接变成正式版&#xff0c;向所有用户推送。 需要注意的是&#xff0c;鉴于iOS 17正式版即将…