Hi极客们!今天我们来探索Python在网络安全领域的应用。通过Python强大的库和工具,我们可以进行网络扫描、漏洞检测等安全测试。请记住:技术无罪,但要遵纪守法!
环境准备:
pip install scapy# 网络数据包处理pip install requests# HTTP请求库pip install paramiko# SSH连接pip install python-nmap# 端口扫描
🔍 端口扫描器:
import socketimport threadingimport timeclassPortScanner:def__init__(self, target, start_port, end_port): self.target = target self.start_port = start_port self.end_port = end_port self.open_ports = []defscan_port(self, port):try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex((self.target, port))if result ==0: self.open_ports.append(port) sock.close()except:passdefscan(self): threads = [] start_time = time.time()for portinrange(self.start_port, self.end_port +1): thread = threading.Thread(target=self.scan_port, args=(port,)) threads.append(thread) thread.start()for threadin threads: thread.join() end_time = time.time()return self.open_ports, end_time - start_time
🔒 SSH暴力破解防护:
import paramikoimport timefrom collectionsimport defaultdictclassSSHGuardian:def__init__(self, log_file): self.log_file = log_file self.attempt_counter = defaultdict(list) self.blacklist =set() self.max_attempts =3 self.time_window =300# 5分钟defmonitor_log(self):withopen(self.log_file,'r')as f:whileTrue: line = f.readline()ifnot line: time.sleep(1)continueif"Failed password for"in line: ip = self.extract_ip(line) timestamp = time.time() self.attempt_counter[ip].append(timestamp) self.check_attempts(ip)defcheck_attempts(self, ip): current_time = time.time()# 清理过期记录 self.attempt_counter[ip] = [ tfor tin self.attempt_counter[ip]if current_time - t <= self.time_window ]iflen(self.attempt_counter[ip]) >= self.max_attempts: self.blacklist.add(ip)print(f"IP{ip}has been blacklisted!")
📡 网络嗅探器:
from scapy.allimport *classNetworkSniffer:def__init__(self, interface): self.interface = interface self.packet_count =0 self.captured_data = []defpacket_callback(self, packet): self.packet_count +=1if packet.haslayer(TCP): self.analyze_tcp(packet)elif packet.haslayer(UDP): self.analyze_udp(packet)defanalyze_tcp(self, packet):if packet.haslayer(HTTP): http_layer = packet[HTTP]if http_layer.fields.get('Method'): self.captured_data.append({'type':'HTTP','method': http_layer.Method,'path': http_layer.Path,'src': packet[IP].src,'dst': packet[IP].dst })defstart_capture(self, duration=None):print(f"Starting capture on{self.interface}") sniff( iface=self.interface, prn=self.packet_callback, timeout=duration )
🛡️ Web应用防火墙:
classWebFirewall:def__init__(self): self.rules = [] self.blocked_ips =set() self.request_counts = defaultdict(int)defadd_rule(self, pattern, action): self.rules.append({'pattern': re.compile(pattern),'action': action })defcheck_request(self, request): client_ip = request.remote_addr# 检查IP是否被封禁if client_ipin self.blocked_ips:returnFalse# 检查请求频率 self.request_counts[client_ip] +=1if self.request_counts[client_ip] >100:# 限制每分钟请求数 self.blocked_ips.add(client_ip)returnFalse# 检查恶意模式for rulein self.rules:if rule['pattern'].search(str(request.values)):if rule['action'] =='block':returnFalsereturnTrue
🔐 密码强度检测器:
import reclassPasswordChecker:def__init__(self): self.min_length =8 self.special_chars ="!@#$%^&*()_+-=[]{}|;:,.<>?"defcheck_strength(self, password): score =0 feedback = []# 检查长度iflen(password) >= self.min_length: score +=1else: feedback.append("密码太短")# 检查大写字母if re.search(r"[A-Z]", password): score +=1else: feedback.append("需要包含大写字母")# 检查小写字母if re.search(r"[a-z]", password): score +=1else: feedback.append("需要包含小写字母")# 检查数字if re.search(r"\d", password): score +=1else: feedback.append("需要包含数字")# 检查特殊字符ifany(charin self.special_charsfor charin password): score +=1else: feedback.append("需要包含特殊字符")return {'score': score,'strength': ['很弱','弱','一般','强','很强'][score],'feedback': feedback }
📊 安全日志分析器:
classLogAnalyzer:def__init__(self): self.patterns = {'login_failure':r'Failed login.*from (\d+\.\d+\.\d+\.\d+)','sql_injection':r'SQL injection attempt.*from (\d+\.\d+\.\d+\.\d+)','xss_attempt':r'XSS attempt detected.*from (\d+\.\d+\.\d+\.\d+)' } self.alerts = defaultdict(list)defanalyze_log(self, log_file):withopen(log_file,'r')as f:for linein f:for attack_type, patternin self.patterns.items(): match = re.search(pattern, line)if match: ip = match.group(1) timestamp = self.extract_timestamp(line) self.alerts[attack_type].append({'ip': ip,'timestamp': timestamp,'raw_log': line.strip() })defgenerate_report(self): report = {'summary': {'total_alerts':sum(len(alerts)for alertsin self.alerts.values()),'attack_types': { k:len(v)for k, vin self.alerts.items() } },'details': self.alerts }return report
🌐 漏洞扫描器:
classVulnerabilityScanner:def__init__(self): self.vulnerabilities = [] self.headers = {'User-Agent':'Security Scanner (Testing Purpose Only)' }defscan_xss(self, url): test_payloads = ['<script>alert(1)</script>','"><script>alert(1)</script>','"><img src=x onerror=alert(1)>' ]for payloadin test_payloads:try: response = requests.get( url, params={'q': payload}, headers=self.headers )if payloadin response.text: self.vulnerabilities.append({'type':'XSS','url': url,'payload': payload })except:continuedefscan_sql_injection(self, url): test_payloads = ["' OR '1'='1","1' OR '1'='1","1; DROP TABLE users--" ]for payloadin test_payloads:try: response = requests.get( url, params={'id': payload}, headers=self.headers )if'SQL'in response.textor'error'in response.text.lower(): self.vulnerabilities.append({'type':'SQL Injection','url': url,'payload': payload })except:continue
⚡ 网络安全最佳实践:
定期更新系统和软件 使用强密码和双因素认证 加密敏感数据 监控网络流量 备份重要数据 进行安全培训
今天的安全教程到此结束!我们学习了:
端口扫描 SSH防护 网络嗅探 Web防火墙 密码检测 日志分析 漏洞扫描
记住:网络安全是一个持续的过程,需要不断学习和更新知识!