1
0
mirror of https://github.com/oskar456/dzonegit.git synced 2024-05-11 05:55:41 +00:00
This commit is contained in:
Ondřej Caletka
2018-07-06 21:53:03 +02:00
commit 31556f7b13
3 changed files with 389 additions and 0 deletions

104
.gitignore vendored Normal file
View File

@ -0,0 +1,104 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/

155
dzonegit.py Normal file
View File

@ -0,0 +1,155 @@
#!/usr/bin/env python3
import subprocess
import re
from collections import namedtuple
from hashlib import sha256
from pathlib import Path
def get_head():
r = subprocess.run(
["git", "rev-parse", "--verify", "HEAD"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
encoding="utf-8"
)
if r.returncode == 0:
return r.stdout.strip()
else:
# Initial commit: diff against an empty tree object
return "4b825dc642cb6eb9a060e54bf8d69288fbee4904"
def check_whitespace_errors(against):
r = subprocess.run(
["git", "diff-index", "--check", "--cached", against],
)
if r.returncode != 0:
raise ValueError("Whitespace errors")
def get_file_contents(path, revision=""):
""" Return contents of a file in staged env or in some revision. """
r = subprocess.run(
["git", "show", f"{revision}:{path}"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
encoding="utf-8",
check=True
)
return r.stdout
def compile_zone(zonename, zonedata):
""" Compile the zone. Return tuple with results."""
CompileResults = namedtuple("CompileResults", "success, serial, "
"zonehash, stderr")
r = subprocess.run(
["/usr/sbin/named-compilezone", "-o", "-", zonename, "/dev/stdin"],
input=zonedata,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
)
m = re.search(r"^zone.*loaded serial ([0-9]*)$", r.stderr, re.MULTILINE)
if r.returncode == 0 and m:
serial = m.group(1)
zonehash = sha256(r.stdout.encode("utf-8")).hexdigest()
return CompileResults(True, serial, zonehash, r.stderr)
else:
return CompileResults(False, None, None, r.stderr)
def is_serial_increased(old, new):
""" Return true if serial number was increased using RFC 1982 logic. """
old, new = (int(n) for n in [old, new])
diff = (new - old) % 2**32
return 0 < diff < (2**31 - 1)
def get_altered_files(against, diff_filter=None):
""" Return list of changed files. """
cmd = ["git", "diff", "--cached", "--name-only", "-z"]
if diff_filter:
cmd.append(f"--diff-filter={diff_filter}")
cmd.append(against)
r = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
encoding="utf-8",
check=True
)
return (Path(p) for p in r.stdout.rstrip("\0").split("\0"))
def get_zone_origin(zonedata, maxlines=10):
"""
Parse $ORIGIN directive in first maxlines lines of zone file.
Return zone name without the trailing dot.
"""
for i, line in enumerate(zonedata.splitlines()):
if i >= maxlines:
break
m = re.match(r"^\$ORIGIN\s+([^ ]+)\.\s*(;.*)?$", line)
if m:
return m.group(1).lower()
def get_zone_name(path, zonedata):
"""
Try to guess zone name from either filename or the first $ORIGIN.
Throw a ValueError if filename and zone ORIGIN differ more than
in slashes.
"""
stemname = Path(path).stem.lower()
originname = get_zone_origin(zonedata)
if originname:
tt = str.maketrans("", "", "/_,:-+*%^&#$")
sn, on = [s.translate(tt) for s in [stemname, originname]]
if sn != on:
raise ValueError('Zone origin and zone file name differ.',
originname, stemname)
return originname
else:
return stemname
def check_updated_zones(against):
""" Check whether all updated zone files compile. """
for f in get_altered_files(against, "AM"):
if not f.suffix == ".zone":
continue
print(f"Checking file {f}")
zonedata = get_file_contents(f)
zname = get_zone_name(f, zonedata)
rnew = compile_zone(zname, zonedata)
if not rnew.success:
raise ValueError("New zone version does not compile", str(f),
rnew.stderr)
try:
zonedata = get_file_contents(f, against)
zname = get_zone_name(f, zonedata)
rold = compile_zone(zname, zonedata)
if (rold.success and rold.zonehash != rnew.zonehash and not
is_serial_increased(rold.serial, rnew.serial)):
raise ValueError("Zone contents changed without "
"increasing serial", f)
except subprocess.CalledProcessError:
pass # Old version of zone did not exist
def main():
against = get_head()
try:
check_whitespace_errors(against)
check_updated_zones(against)
except ValueError as e:
print("\n".join(e.args))
raise SystemExit(1)
if __name__ == "__main__":
main()

130
tests/test_dzonegit.py Normal file
View File

@ -0,0 +1,130 @@
import pytest
import contextlib
import os
import subprocess
from pathlib import Path
from dzonegit import *
@contextlib.contextmanager
def cwd(directory):
curdir = os.getcwd()
try:
os.chdir(Path(__file__).parent / directory)
yield
finally:
os.chdir(curdir)
def test_get_head_empty():
with cwd("emptyrepo"):
assert get_head() == "4b825dc642cb6eb9a060e54bf8d69288fbee4904"
with cwd("testrepo"):
assert get_head() == "ca6f091201985bfb3e047b3bba8632235e1c0486"
def test_check_whitespace_errors():
with cwd("emptyrepo"):
with pytest.raises(ValueError):
check_whitespace_errors(get_head())
with cwd("testrepo"):
check_whitespace_errors(get_head())
def test_get_file_contents():
with cwd("testrepo"):
assert get_file_contents('dummy') == "dummy\n"
with pytest.raises(subprocess.CalledProcessError):
get_file_contents('nonexistent')
def test_compile_zone():
testzone = """
$ORIGIN example.com.
@ 60 IN SOA ns hostmaster (
1234567890 ; serial
3600 ; refresh (1 hour)
900 ; retry (15 minutes)
1814400 ; expire (3 weeks)
60 ; minimum (1 minute)
)
60 IN NS ns
ns.example.com. 60 IN A 192.0.2.1
"""
r = compile_zone("example.org", testzone)
assert not r.success
assert r.zonehash is None
assert r.stderr
r = compile_zone("example.com", testzone)
assert r.success
assert r.serial == "1234567890"
assert r.zonehash
r2 = compile_zone("example.com", testzone + "\n\n; some comment")
assert r.zonehash == r2.zonehash
def test_is_serial_increased():
assert is_serial_increased(1234567890, "2018010100")
assert is_serial_increased("2018010100", "4018010100")
assert is_serial_increased("4018010100", "1234567890")
assert not is_serial_increased(2018010100, "1234567890")
def test_get_altered_files():
with cwd("testrepo"):
files = set(get_altered_files("HEAD", "A"))
assert files == set([
Path("zones/example.org.zone")
])
def test_get_zone_origin():
testzone = """
$ORIGIN examPle.com. ;coment
@ 60 IN SOA ns hostmaster 1 60 60 60 60
60 IN NS ns
ns.example.com. 60 IN A 192.0.2.1
$ORIGIN sub
$ORIGIN subsub.example.com.
$ORIGIN example.com.
"""
assert "example.com" == get_zone_origin(testzone)
testzone = """
@ 60 IN SOA ns hostmaster 1 60 60 60 60
60 IN NS ns
ns.example.com. 60 IN A 192.0.2.1
"""
assert get_zone_origin(testzone) is None
testzone = """
@ 60 IN SOA ns hostmaster 1 60 60 60 60
60 IN NS ns
ns.example.com. 60 IN A 192.0.2.1
$ORIGIN sub.example.com.
"""
assert get_zone_origin(testzone, 4) is None
def test_get_zone_name():
testzone = """
$ORIGIN eXample.com. ;coment
@ 60 IN SOA ns hostmaster 1 60 60 60 60
60 IN NS ns
ns.example.com. 60 IN A 192.0.2.1
"""
assert "example.com" == get_zone_name("zones/example.com.zone", "")
assert "example.com" == get_zone_name("zones/example.com.zone", testzone)
with pytest.raises(ValueError):
get_zone_name("zones/example.org.zone", testzone)
testzone = """
$ORIGIN 240/28.2.0.192.in-addr.arpa.
@ 60 IN SOA ns hostmaster 1 60 60 60 60
60 IN NS ns
ns 60 IN A 192.0.2.1
"""
assert "240/28.2.0.192.in-addr.arpa" == get_zone_name(
"zones/240-28.2.0.192.in-addr.arpa.zone",
testzone
)
def test_check_updated_zones():
with cwd("emptyrepo"):
with pytest.raises(ValueError):
check_updated_zones(get_head())
with cwd("testrepo"):
check_updated_zones(get_head())