Files
AS_Network_List/generate_nft_blacklist.py
Alexander Serkin a3a5ec2ea8 Add nftables support with config generator and IP checker (#22)
* Add nftables support with config generator and IP checker

- Add generate_nft_blacklist.py for generating nftables configurations
- Add check_nft_blacklist.py for verifying IPs against blacklist
- Add blacklists_updater_nftables.sh for automated updates
- Add blacklists_nftables/ directory with generated configs
- Add GitHub Actions workflow for daily nftables updates
- Update README.md with nftables usage instructions

nftables is a modern replacement for iptables with better performance
and lower memory usage, especially for large rulesets. This addition
complements the existing iptables and nginx blacklist formats.

* Added nftables scripts help
2026-01-12 10:46:07 +01:00

156 lines
5.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""
generate_nft_blacklist.py
Reads prefixes from a file or stdin, aggregates them and writes nftables config.
Uses named sets for efficient blacklist management.
Usage:
git clone https://github.com/C24Be/AS_Network_List.git
generate_nft_blacklist.py ./AS_Network_List/blacklists/blacklist.txt nft_bl.conf
cp nft_bl.conf /etc/nftables.d/
systemctl restart nftables
"""
import sys
from ipaddress import ip_network, collapse_addresses
from pathlib import Path
from datetime import datetime
def read_lines(path_or_dash):
if path_or_dash == "-":
print("Reading prefixes from STDIN...")
return [ln.rstrip("\n") for ln in sys.stdin]
p = Path(path_or_dash)
if not p.exists():
raise FileNotFoundError(f"Input file not found: {path_or_dash}")
text = p.read_text(encoding="utf-8")
return text.splitlines()
def aggregate_prefixes(lines):
v4, v6, invalid = [], [], []
for lineno, ln in enumerate(lines, start=1):
s = ln.strip()
if not s or s.startswith("#"):
continue
try:
net = ip_network(s, strict=False)
if net.version == 4:
v4.append(net)
else:
v6.append(net)
except Exception as e:
invalid.append((lineno, s, str(e)))
agg_v4 = list(collapse_addresses(sorted(v4, key=lambda x: (int(x.network_address), x.prefixlen))))
agg_v6 = list(collapse_addresses(sorted(v6, key=lambda x: (int(x.network_address), x.prefixlen))))
return agg_v4, agg_v6, invalid
def make_nft_config(agg_v4, agg_v6, comment=None):
lines = []
lines.append("# Autogenerated nftables blacklist")
lines.append(f"# Generated: {datetime.utcnow().isoformat()}Z")
if comment:
lines.append(f"# {comment}")
lines.append(f"# IPv4: {len(agg_v4)}, IPv6: {len(agg_v6)}")
lines.append("")
lines.append("table inet filter {")
lines.append("")
# Define IPv4 blacklist set
lines.append(" set blacklist_v4 {")
lines.append(" type ipv4_addr")
lines.append(" flags interval")
if agg_v4:
lines.append(" elements = {")
for i, net in enumerate(agg_v4):
comma = "," if i < len(agg_v4) - 1 else ""
lines.append(f" {net.with_prefixlen}{comma}")
lines.append(" }")
lines.append(" }")
lines.append("")
# Define IPv6 blacklist set
lines.append(" set blacklist_v6 {")
lines.append(" type ipv6_addr")
lines.append(" flags interval")
if agg_v6:
lines.append(" elements = {")
for i, net in enumerate(agg_v6):
comma = "," if i < len(agg_v6) - 1 else ""
lines.append(f" {net.with_prefixlen}{comma}")
lines.append(" }")
lines.append(" }")
lines.append("")
# Define input chain with set lookups
lines.append(" chain input {")
lines.append(" type filter hook input priority 0;")
lines.append(" policy accept;")
lines.append("")
lines.append(" ct state { established, related } accept")
lines.append("")
if agg_v4:
lines.append(" ip saddr @blacklist_v4 counter drop")
if agg_v6:
lines.append(" ip6 saddr @blacklist_v6 counter drop")
lines.append(" }")
lines.append("}")
return "\n".join(lines)
def write_output(outpath, content):
if outpath == "-":
print(content)
return
p = Path(outpath)
p.write_text(content, encoding="utf-8")
p.chmod(0o644)
print(f"Wrote nft config to: {p} (size: {p.stat().st_size} bytes)")
def main(argv):
if len(argv) < 3:
print("Usage: python3 generate_nft_blacklist.py input.txt output.conf")
print("Use '-' as input or output to mean STDIN/STDOUT respectively.")
return 2
infile, outfile = argv[1], argv[2]
try:
lines = read_lines(infile)
except Exception as e:
print(f"ERROR reading input: {e}", file=sys.stderr)
return 3
if not any(line.strip() and not line.strip().startswith("#") for line in lines):
print("WARNING: input contains no prefixes (empty or only comments). Nothing to aggregate.")
nft_conf = make_nft_config([], [], comment="Empty input produced no prefixes")
write_output(outfile, nft_conf)
return 0
agg_v4, agg_v6, invalid = aggregate_prefixes(lines)
if invalid:
print("Some lines could not be parsed (line, text, error):")
for ln, txt, err in invalid:
print(f" {ln}: '{txt}' --> {err}", file=sys.stderr)
print(f"Aggregated IPv4 prefixes: {len(agg_v4)}")
for n in agg_v4:
print(" v4:", n)
print(f"Aggregated IPv6 prefixes: {len(agg_v6)}")
for n in agg_v6:
print(" v6:", n)
nft_conf = make_nft_config(agg_v4, agg_v6, comment=f"Source: {infile}")
try:
write_output(outfile, nft_conf)
except Exception as e:
print(f"ERROR writing output: {e}", file=sys.stderr)
return 4
print("Done.")
print("Load with: sudo nft -f <output.conf>")
print("View counters: sudo nft list chain inet filter input -a")
print("View sets: sudo nft list set inet filter blacklist_v4")
print(" sudo nft list set inet filter blacklist_v6")
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))