Source code for hugedict.misc

import functools
import gzip
import pickle
import threading
from typing import Any, Callable

import zstandard as zstd

from hugedict.types import T

container = threading.local()
container.zstd_3_compressor = zstd.ZstdCompressor(level=3)
container.zstd_6_compressor = zstd.ZstdCompressor(level=6)
container.zstd_decompressor = zstd.ZstdDecompressor()


[docs]def init_container(container): if not hasattr(container, "zstd_decompressor"): container.zstd_3_compressor = zstd.ZstdCompressor(level=3) container.zstd_6_compressor = zstd.ZstdCompressor(level=6) container.zstd_decompressor = zstd.ZstdDecompressor()
[docs]def identity(x): return x
[docs]def compress_pyobject(x): return gzip.compress(pickle.dumps(x), mtime=0)
[docs]def decompress_pyobject(x): return pickle.loads(gzip.decompress(x))
[docs]def zstd3_compress(x): global container init_container(container) return container.zstd_3_compressor.compress(x)
[docs]def zstd6_compress(x): global container init_container(container) return container.zstd_6_compressor.compress(x)
[docs]def zstd_decompress(x): global container init_container(container) return container.zstd_decompressor.decompress(x)
[docs]def compress_zstd3_pyobject(x): global container init_container(container) return container.zstd_3_compressor.compress(pickle.dumps(x))
[docs]def compress_zstd6_pyobject(x): global container init_container(container) return container.zstd_6_compressor.compress(pickle.dumps(x))
[docs]def decompress_zstd_pyobject(x): global container init_container(container) return pickle.loads(container.zstd_decompressor.decompress(x))
[docs]def compress_custom(ser_fn: Callable[[T], bytes]) -> Callable[[T], bytes]: @functools.wraps(ser_fn) def wrapper(x): return gzip.compress(ser_fn(x), mtime=0) return wrapper
[docs]def decompress_custom(deser_fn: Callable[[bytes], T]) -> Callable[[bytes], T]: @functools.wraps(deser_fn) def wrapper(x): return deser_fn(gzip.decompress(x)) return wrapper
[docs]def zstd3_compress_custom(ser_fn: Callable[[T], bytes]) -> Callable[[T], bytes]: @functools.wraps(ser_fn) def wrapper(x): global container init_container(container) return container.zstd_3_compressor.compress(ser_fn(x)) return wrapper
[docs]def zstd6_compress_custom(ser_fn: Callable[[T], bytes]) -> Callable[[T], bytes]: @functools.wraps(ser_fn) def wrapper(x): global container init_container(container) return container.zstd_6_compressor.compress(ser_fn(x)) return wrapper
[docs]def zstd_decompress_custom(deser_fn: Callable[[bytes], T]) -> Callable[[bytes], T]: @functools.wraps(deser_fn) def wrapper(x): global container init_container(container) return deser_fn(container.zstd_decompressor.decompress(x)) return wrapper
[docs]class Chain2: def __init__(self, g, f): self.g = g self.f = f
[docs] def exec(self, args): return self.g(self.f(args))
def __call__(self, args): return self.g(self.f(args))
[docs]class Chain3: def __init__(self, h, g, f): self.h = h self.g = g self.f = f
[docs] def exec(self, args): return self.h(self.g(self.f(args)))