casync unittests (#25306)

* simple extract test

* add test with loopback

* test for chunk reuse

* test seeding

* add instructions

* cleanup

* install casync

* make more tests work without a loopback
old-commit-hash: 753f4c7fab
taco
Willem Melching 3 years ago committed by GitHub
parent 283970114e
commit dfcd3a2910
  1. 15
      system/hardware/tici/casync.py
  2. 0
      system/hardware/tici/tests/__init__.py
  3. 153
      system/hardware/tici/tests/test_casync.py
  4. 1
      tools/ubuntu_setup.sh

@ -40,10 +40,12 @@ class ChunkReader(ABC):
class FileChunkReader(ChunkReader): class FileChunkReader(ChunkReader):
"""Reads chunks from a local file""" """Reads chunks from a local file"""
def __init__(self, fn: str) -> None: def __init__(self, fn: str) -> None:
super().__init__() super().__init__()
self.f = open(fn, 'rb') self.f = open(fn, 'rb')
def __del__(self):
self.f.close()
def read(self, chunk: Chunk) -> bytes: def read(self, chunk: Chunk) -> bytes:
self.f.seek(chunk.offset) self.f.seek(chunk.offset)
return self.f.read(chunk.length) return self.f.read(chunk.length)
@ -61,6 +63,10 @@ class RemoteChunkReader(ChunkReader):
sha_hex = chunk.sha.hex() sha_hex = chunk.sha.hex()
url = os.path.join(self.url, sha_hex[:4], sha_hex + ".cacnk") url = os.path.join(self.url, sha_hex[:4], sha_hex + ".cacnk")
if os.path.isfile(url):
with open(url, 'rb') as f:
contents = f.read()
else:
for i in range(CHUNK_DOWNLOAD_RETRIES): for i in range(CHUNK_DOWNLOAD_RETRIES):
try: try:
resp = self.session.get(url, timeout=CHUNK_DOWNLOAD_TIMEOUT) resp = self.session.get(url, timeout=CHUNK_DOWNLOAD_TIMEOUT)
@ -71,9 +77,10 @@ class RemoteChunkReader(ChunkReader):
time.sleep(CHUNK_DOWNLOAD_TIMEOUT) time.sleep(CHUNK_DOWNLOAD_TIMEOUT)
resp.raise_for_status() resp.raise_for_status()
contents = resp.content
decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_AUTO) decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_AUTO)
return decompressor.decompress(resp.content) return decompressor.decompress(contents)
def parse_caibx(caibx_path: str) -> List[Chunk]: def parse_caibx(caibx_path: str) -> List[Chunk]:
@ -120,6 +127,7 @@ def parse_caibx(caibx_path: str) -> List[Chunk]:
chunks.append(Chunk(sha, offset, length)) chunks.append(Chunk(sha, offset, length))
offset = new_offset offset = new_offset
caibx.close()
return chunks return chunks
@ -139,7 +147,8 @@ def extract(target: List[Chunk],
progress: Optional[Callable[[int], None]] = None): progress: Optional[Callable[[int], None]] = None):
stats: Dict[str, int] = defaultdict(int) stats: Dict[str, int] = defaultdict(int)
with open(out_path, 'wb') as out: mode = 'rb+' if os.path.exists(out_path) else 'wb'
with open(out_path, mode) as out:
for cur_chunk in target: for cur_chunk in target:
# Find source for desired chunk # Find source for desired chunk

@ -0,0 +1,153 @@
#!/usr/bin/env python3
import os
import unittest
import tempfile
import subprocess
import system.hardware.tici.casync as casync
# dd if=/dev/zero of=/tmp/img.raw bs=1M count=2
# sudo losetup -f /tmp/img.raw
# losetup -a | grep img.raw
LOOPBACK = os.environ.get('LOOPBACK', None)
class TestCasync(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.tmpdir = tempfile.TemporaryDirectory()
# Build example contents
chunk_a = [i % 256 for i in range(1024)] * 512
chunk_b = [(256 - i) % 256 for i in range(1024)] * 512
zeroes = [0] * (1024 * 128)
contents = chunk_a + chunk_b + zeroes + chunk_a
cls.contents = bytes(contents)
# Write to file
cls.orig_fn = os.path.join(cls.tmpdir.name, 'orig.bin')
with open(cls.orig_fn, 'wb') as f:
f.write(cls.contents)
# Create casync files
cls.manifest_fn = os.path.join(cls.tmpdir.name, 'orig.caibx')
cls.store_fn = os.path.join(cls.tmpdir.name, 'store')
subprocess.check_output(["casync", "make", "--compression=xz", "--store", cls.store_fn, cls.manifest_fn, cls.orig_fn])
target = casync.parse_caibx(cls.manifest_fn)
hashes = [c.sha.hex() for c in target]
# Ensure we have chunk reuse
assert len(hashes) > len(set(hashes))
def setUp(self):
# Clear target_lo
if LOOPBACK is not None:
self.target_lo = LOOPBACK
with open(self.target_lo, 'wb') as f:
f.write(b"0" * len(self.contents))
self.target_fn = os.path.join(self.tmpdir.name, next(tempfile._get_candidate_names()))
self.seed_fn = os.path.join(self.tmpdir.name, next(tempfile._get_candidate_names()))
def tearDown(self):
for fn in [self.target_fn, self.seed_fn]:
try:
os.unlink(fn)
except FileNotFoundError:
pass
def test_simple_extract(self):
target = casync.parse_caibx(self.manifest_fn)
sources = [('remote', casync.RemoteChunkReader(self.store_fn), casync.build_chunk_dict(target))]
stats = casync.extract(target, sources, self.target_fn)
with open(self.target_fn, 'rb') as target_f:
self.assertEqual(target_f.read(), self.contents)
self.assertEqual(stats['remote'], len(self.contents))
def test_seed(self):
target = casync.parse_caibx(self.manifest_fn)
# Populate seed with half of the target contents
with open(self.seed_fn, 'wb') as seed_f:
seed_f.write(self.contents[:len(self.contents) // 2])
sources = [('seed', casync.FileChunkReader(self.seed_fn), casync.build_chunk_dict(target))]
sources += [('remote', casync.RemoteChunkReader(self.store_fn), casync.build_chunk_dict(target))]
stats = casync.extract(target, sources, self.target_fn)
with open(self.target_fn, 'rb') as target_f:
self.assertEqual(target_f.read(), self.contents)
self.assertGreater(stats['seed'], 0)
self.assertLess(stats['remote'], len(self.contents))
def test_already_done(self):
"""Test that an already flashed target doesn't download any chunks"""
target = casync.parse_caibx(self.manifest_fn)
with open(self.target_fn, 'wb') as f:
f.write(self.contents)
sources = [('target', casync.FileChunkReader(self.target_fn), casync.build_chunk_dict(target))]
sources += [('remote', casync.RemoteChunkReader(self.store_fn), casync.build_chunk_dict(target))]
stats = casync.extract(target, sources, self.target_fn)
with open(self.target_fn, 'rb') as f:
self.assertEqual(f.read(), self.contents)
self.assertEqual(stats['target'], len(self.contents))
def test_chunk_reuse(self):
"""Test that chunks that are reused are only downloaded once"""
target = casync.parse_caibx(self.manifest_fn)
# Ensure target exists
with open(self.target_fn, 'wb'):
pass
sources = [('target', casync.FileChunkReader(self.target_fn), casync.build_chunk_dict(target))]
sources += [('remote', casync.RemoteChunkReader(self.store_fn), casync.build_chunk_dict(target))]
stats = casync.extract(target, sources, self.target_fn)
with open(self.target_fn, 'rb') as f:
self.assertEqual(f.read(), self.contents)
self.assertLess(stats['remote'], len(self.contents))
@unittest.skipUnless(LOOPBACK, "requires loopback device")
def test_lo_simple_extract(self):
target = casync.parse_caibx(self.manifest_fn)
sources = [('remote', casync.RemoteChunkReader(self.store_fn), casync.build_chunk_dict(target))]
stats = casync.extract(target, sources, self.target_lo)
with open(self.target_lo, 'rb') as target_f:
self.assertEqual(target_f.read(len(self.contents)), self.contents)
self.assertEqual(stats['remote'], len(self.contents))
@unittest.skipUnless(LOOPBACK, "requires loopback device")
def test_lo_chunk_reuse(self):
"""Test that chunks that are reused are only downloaded once"""
target = casync.parse_caibx(self.manifest_fn)
sources = [('target', casync.FileChunkReader(self.target_lo), casync.build_chunk_dict(target))]
sources += [('remote', casync.RemoteChunkReader(self.store_fn), casync.build_chunk_dict(target))]
stats = casync.extract(target, sources, self.target_lo)
with open(self.target_lo, 'rb') as f:
self.assertEqual(f.read(len(self.contents)), self.contents)
self.assertLess(stats['remote'], len(self.contents))
if __name__ == "__main__":
unittest.main()

@ -14,6 +14,7 @@ function install_ubuntu_common_requirements() {
autoconf \ autoconf \
build-essential \ build-essential \
ca-certificates \ ca-certificates \
casync \
clang \ clang \
cmake \ cmake \
make \ make \

Loading…
Cancel
Save