Files
AS_Network_List/generate_nft_blacklist.py
2026-03-27 19:11:52 +01:00

175 lines
6.7 KiB
Python
Executable File

#!/usr/bin/env python3
"""
generate_nft_blacklist.py
Reads prefixes from a file or stdin, aggregates them and writes nftables config.
Uses named sets for efficient blacklist management.
Usage:
git clone https://github.com/C24Be/AS_Network_List.git
generate_nft_blacklist.py ./AS_Network_List/blacklists/blacklist.txt nft_bl.conf
cp nft_bl.conf /etc/nftables.d/
systemctl restart nftables
"""
import sys
from ipaddress import ip_network, collapse_addresses
from pathlib import Path
from datetime import datetime, UTC
def read_lines(path_or_dash):
if path_or_dash == "-":
print("Reading prefixes from STDIN...")
return [ln.rstrip("\n") for ln in sys.stdin]
p = Path(path_or_dash)
if not p.exists():
raise FileNotFoundError(f"Input file not found: {path_or_dash}")
text = p.read_text(encoding="utf-8")
return text.splitlines()
def aggregate_prefixes(lines):
v4, v6, invalid = [], [], []
for lineno, ln in enumerate(lines, start=1):
s = ln.strip()
if not s or s.startswith("#"):
continue
try:
net = ip_network(s, strict=False)
if net.version == 4:
v4.append(net)
else:
v6.append(net)
except Exception as e:
invalid.append((lineno, s, str(e)))
agg_v4 = list(collapse_addresses(sorted(v4, key=lambda x: (int(x.network_address), x.prefixlen))))
agg_v6 = list(collapse_addresses(sorted(v6, key=lambda x: (int(x.network_address), x.prefixlen))))
return agg_v4, agg_v6, invalid
def make_nft_config(agg_v4, agg_v6, comment=None, usage_profile="vm_input"):
if usage_profile == "vk_forward":
set_v4_name = "blacklist_vk_v4"
set_v6_name = "blacklist_vk_v6"
rule_v4 = f'sudo nft add rule inet filter forward iifname "<VPN_IFACE>" ip daddr @{set_v4_name} counter reject'
rule_v6 = f'sudo nft add rule inet filter forward iifname "<VPN_IFACE>" ip6 daddr @{set_v6_name} counter reject'
else:
set_v4_name = "blacklist_v4"
set_v6_name = "blacklist_v6"
rule_v4 = f"sudo nft add rule inet filter input ip saddr @{set_v4_name} counter reject"
rule_v6 = f"sudo nft add rule inet filter input ip6 saddr @{set_v6_name} counter reject"
lines = []
lines.append("# Autogenerated nftables blacklist")
lines.append(f"# Generated: {datetime.now(UTC).isoformat().replace('+00:00', 'Z')}")
if comment:
lines.append(f"# {comment}")
lines.append(f"# IPv4: {len(agg_v4)}, IPv6: {len(agg_v6)}")
lines.append("#")
lines.append("# Usage:")
lines.append("# sudo nft -f <this-file>")
if usage_profile == "vk_forward":
lines.append("# # VK egress blocking for VPN clients via NAT/FORWARD")
lines.append("# sudo nft add chain inet filter forward '{ type filter hook forward priority 0; policy accept; }'")
lines.append(f"# {rule_v4}")
lines.append(f"# {rule_v6}")
else:
lines.append("# # VM protection from incoming blacklist sources")
lines.append("# sudo nft add chain inet filter input '{ type filter hook input priority 0; policy accept; }'")
lines.append(f"# {rule_v4}")
lines.append(f"# {rule_v6}")
lines.append("")
lines.append("table inet filter {")
lines.append("")
# Define IPv4 blacklist set
lines.append(f" set {set_v4_name} {{")
lines.append(" type ipv4_addr")
lines.append(" flags interval")
if agg_v4:
lines.append(" elements = {")
for i, net in enumerate(agg_v4):
comma = "," if i < len(agg_v4) - 1 else ""
rendered_net = net.with_prefixlen if hasattr(net, "with_prefixlen") else str(net)
lines.append(f" {rendered_net}{comma}")
lines.append(" }")
lines.append(" }")
lines.append("")
# Define IPv6 blacklist set
lines.append(f" set {set_v6_name} {{")
lines.append(" type ipv6_addr")
lines.append(" flags interval")
if agg_v6:
lines.append(" elements = {")
for i, net in enumerate(agg_v6):
comma = "," if i < len(agg_v6) - 1 else ""
rendered_net = net.with_prefixlen if hasattr(net, "with_prefixlen") else str(net)
lines.append(f" {rendered_net}{comma}")
lines.append(" }")
lines.append(" }")
lines.append("")
lines.append("}")
return "\n".join(lines)
def write_output(outpath, content):
if outpath == "-":
print(content)
return
p = Path(outpath)
p.write_text(content, encoding="utf-8")
p.chmod(0o644)
print(f"Wrote nft config to: {p} (size: {p.stat().st_size} bytes)")
def main(argv):
if len(argv) < 3:
print("Usage: python3 generate_nft_blacklist.py input.txt output.conf")
print("Use '-' as input or output to mean STDIN/STDOUT respectively.")
return 2
infile, outfile = argv[1], argv[2]
try:
lines = read_lines(infile)
except Exception as e:
print(f"ERROR reading input: {e}", file=sys.stderr)
return 3
if not any(line.strip() and not line.strip().startswith("#") for line in lines):
print("WARNING: input contains no prefixes (empty or only comments). Nothing to aggregate.")
profile = "vk_forward" if "vk" in Path(infile).name.lower() else "vm_input"
nft_conf = make_nft_config([], [], comment="Empty input produced no prefixes", usage_profile=profile)
write_output(outfile, nft_conf)
return 0
agg_v4, agg_v6, invalid = aggregate_prefixes(lines)
if invalid:
print("Some lines could not be parsed (line, text, error):")
for ln, txt, err in invalid:
print(f" {ln}: '{txt}' --> {err}", file=sys.stderr)
print(f"Aggregated IPv4 prefixes: {len(agg_v4)}")
for n in agg_v4:
print(" v4:", n)
print(f"Aggregated IPv6 prefixes: {len(agg_v6)}")
for n in agg_v6:
print(" v6:", n)
profile = "vk_forward" if "vk" in Path(infile).name.lower() else "vm_input"
nft_conf = make_nft_config(agg_v4, agg_v6, comment=f"Source: {infile}", usage_profile=profile)
try:
write_output(outfile, nft_conf)
except Exception as e:
print(f"ERROR writing output: {e}", file=sys.stderr)
return 4
print("Done.")
print("Load with: sudo nft -f <output.conf>")
if profile == "vk_forward":
print("View sets: sudo nft list set inet filter blacklist_vk_v4")
print(" sudo nft list set inet filter blacklist_vk_v6")
else:
print("View sets: sudo nft list set inet filter blacklist_v4")
print(" sudo nft list set inet filter blacklist_v6")
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))