mirror of
https://github.com/C24Be/AS_Network_List.git
synced 2026-01-24 23:26:38 +03:00
* Add nftables support with config generator and IP checker - Add generate_nft_blacklist.py for generating nftables configurations - Add check_nft_blacklist.py for verifying IPs against blacklist - Add blacklists_updater_nftables.sh for automated updates - Add blacklists_nftables/ directory with generated configs - Add GitHub Actions workflow for daily nftables updates - Update README.md with nftables usage instructions nftables is a modern replacement for iptables with better performance and lower memory usage, especially for large rulesets. This addition complements the existing iptables and nginx blacklist formats. * Added nftables scripts help
156 lines
5.3 KiB
Python
Executable File
156 lines
5.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
generate_nft_blacklist.py
|
|
Reads prefixes from a file or stdin, aggregates them and writes nftables config.
|
|
Uses named sets for efficient blacklist management.
|
|
Usage:
|
|
git clone https://github.com/C24Be/AS_Network_List.git
|
|
generate_nft_blacklist.py ./AS_Network_List/blacklists/blacklist.txt nft_bl.conf
|
|
cp nft_bl.conf /etc/nftables.d/
|
|
systemctl restart nftables
|
|
"""
|
|
|
|
import sys
|
|
from ipaddress import ip_network, collapse_addresses
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
def read_lines(path_or_dash):
|
|
if path_or_dash == "-":
|
|
print("Reading prefixes from STDIN...")
|
|
return [ln.rstrip("\n") for ln in sys.stdin]
|
|
p = Path(path_or_dash)
|
|
if not p.exists():
|
|
raise FileNotFoundError(f"Input file not found: {path_or_dash}")
|
|
text = p.read_text(encoding="utf-8")
|
|
return text.splitlines()
|
|
|
|
def aggregate_prefixes(lines):
|
|
v4, v6, invalid = [], [], []
|
|
for lineno, ln in enumerate(lines, start=1):
|
|
s = ln.strip()
|
|
if not s or s.startswith("#"):
|
|
continue
|
|
try:
|
|
net = ip_network(s, strict=False)
|
|
if net.version == 4:
|
|
v4.append(net)
|
|
else:
|
|
v6.append(net)
|
|
except Exception as e:
|
|
invalid.append((lineno, s, str(e)))
|
|
agg_v4 = list(collapse_addresses(sorted(v4, key=lambda x: (int(x.network_address), x.prefixlen))))
|
|
agg_v6 = list(collapse_addresses(sorted(v6, key=lambda x: (int(x.network_address), x.prefixlen))))
|
|
return agg_v4, agg_v6, invalid
|
|
|
|
def make_nft_config(agg_v4, agg_v6, comment=None):
|
|
lines = []
|
|
lines.append("# Autogenerated nftables blacklist")
|
|
lines.append(f"# Generated: {datetime.utcnow().isoformat()}Z")
|
|
if comment:
|
|
lines.append(f"# {comment}")
|
|
lines.append(f"# IPv4: {len(agg_v4)}, IPv6: {len(agg_v6)}")
|
|
lines.append("")
|
|
lines.append("table inet filter {")
|
|
lines.append("")
|
|
|
|
# Define IPv4 blacklist set
|
|
lines.append(" set blacklist_v4 {")
|
|
lines.append(" type ipv4_addr")
|
|
lines.append(" flags interval")
|
|
if agg_v4:
|
|
lines.append(" elements = {")
|
|
for i, net in enumerate(agg_v4):
|
|
comma = "," if i < len(agg_v4) - 1 else ""
|
|
lines.append(f" {net.with_prefixlen}{comma}")
|
|
lines.append(" }")
|
|
lines.append(" }")
|
|
lines.append("")
|
|
|
|
# Define IPv6 blacklist set
|
|
lines.append(" set blacklist_v6 {")
|
|
lines.append(" type ipv6_addr")
|
|
lines.append(" flags interval")
|
|
if agg_v6:
|
|
lines.append(" elements = {")
|
|
for i, net in enumerate(agg_v6):
|
|
comma = "," if i < len(agg_v6) - 1 else ""
|
|
lines.append(f" {net.with_prefixlen}{comma}")
|
|
lines.append(" }")
|
|
lines.append(" }")
|
|
lines.append("")
|
|
|
|
# Define input chain with set lookups
|
|
lines.append(" chain input {")
|
|
lines.append(" type filter hook input priority 0;")
|
|
lines.append(" policy accept;")
|
|
lines.append("")
|
|
lines.append(" ct state { established, related } accept")
|
|
lines.append("")
|
|
if agg_v4:
|
|
lines.append(" ip saddr @blacklist_v4 counter drop")
|
|
if agg_v6:
|
|
lines.append(" ip6 saddr @blacklist_v6 counter drop")
|
|
lines.append(" }")
|
|
lines.append("}")
|
|
return "\n".join(lines)
|
|
|
|
def write_output(outpath, content):
|
|
if outpath == "-":
|
|
print(content)
|
|
return
|
|
p = Path(outpath)
|
|
p.write_text(content, encoding="utf-8")
|
|
p.chmod(0o644)
|
|
print(f"Wrote nft config to: {p} (size: {p.stat().st_size} bytes)")
|
|
|
|
def main(argv):
|
|
if len(argv) < 3:
|
|
print("Usage: python3 generate_nft_blacklist.py input.txt output.conf")
|
|
print("Use '-' as input or output to mean STDIN/STDOUT respectively.")
|
|
return 2
|
|
|
|
infile, outfile = argv[1], argv[2]
|
|
try:
|
|
lines = read_lines(infile)
|
|
except Exception as e:
|
|
print(f"ERROR reading input: {e}", file=sys.stderr)
|
|
return 3
|
|
|
|
if not any(line.strip() and not line.strip().startswith("#") for line in lines):
|
|
print("WARNING: input contains no prefixes (empty or only comments). Nothing to aggregate.")
|
|
nft_conf = make_nft_config([], [], comment="Empty input produced no prefixes")
|
|
write_output(outfile, nft_conf)
|
|
return 0
|
|
|
|
agg_v4, agg_v6, invalid = aggregate_prefixes(lines)
|
|
|
|
if invalid:
|
|
print("Some lines could not be parsed (line, text, error):")
|
|
for ln, txt, err in invalid:
|
|
print(f" {ln}: '{txt}' --> {err}", file=sys.stderr)
|
|
|
|
print(f"Aggregated IPv4 prefixes: {len(agg_v4)}")
|
|
for n in agg_v4:
|
|
print(" v4:", n)
|
|
print(f"Aggregated IPv6 prefixes: {len(agg_v6)}")
|
|
for n in agg_v6:
|
|
print(" v6:", n)
|
|
|
|
nft_conf = make_nft_config(agg_v4, agg_v6, comment=f"Source: {infile}")
|
|
try:
|
|
write_output(outfile, nft_conf)
|
|
except Exception as e:
|
|
print(f"ERROR writing output: {e}", file=sys.stderr)
|
|
return 4
|
|
|
|
print("Done.")
|
|
print("Load with: sudo nft -f <output.conf>")
|
|
print("View counters: sudo nft list chain inet filter input -a")
|
|
print("View sets: sudo nft list set inet filter blacklist_v4")
|
|
print(" sudo nft list set inet filter blacklist_v6")
|
|
return 0
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main(sys.argv))
|