Source code for hugedict.misc
import functools
import gzip
import pickle
import threading
from typing import Any, Callable
import zstandard as zstd
from hugedict.types import T
container = threading.local()
container.zstd_3_compressor = zstd.ZstdCompressor(level=3)
container.zstd_6_compressor = zstd.ZstdCompressor(level=6)
container.zstd_decompressor = zstd.ZstdDecompressor()
[docs]def init_container(container):
if not hasattr(container, "zstd_decompressor"):
container.zstd_3_compressor = zstd.ZstdCompressor(level=3)
container.zstd_6_compressor = zstd.ZstdCompressor(level=6)
container.zstd_decompressor = zstd.ZstdDecompressor()
[docs]def zstd3_compress(x):
global container
init_container(container)
return container.zstd_3_compressor.compress(x)
[docs]def zstd6_compress(x):
global container
init_container(container)
return container.zstd_6_compressor.compress(x)
[docs]def zstd_decompress(x):
global container
init_container(container)
return container.zstd_decompressor.decompress(x)
[docs]def compress_zstd3_pyobject(x):
global container
init_container(container)
return container.zstd_3_compressor.compress(pickle.dumps(x))
[docs]def compress_zstd6_pyobject(x):
global container
init_container(container)
return container.zstd_6_compressor.compress(pickle.dumps(x))
[docs]def decompress_zstd_pyobject(x):
global container
init_container(container)
return pickle.loads(container.zstd_decompressor.decompress(x))
[docs]def compress_custom(ser_fn: Callable[[T], bytes]) -> Callable[[T], bytes]:
@functools.wraps(ser_fn)
def wrapper(x):
return gzip.compress(ser_fn(x), mtime=0)
return wrapper
[docs]def decompress_custom(deser_fn: Callable[[bytes], T]) -> Callable[[bytes], T]:
@functools.wraps(deser_fn)
def wrapper(x):
return deser_fn(gzip.decompress(x))
return wrapper
[docs]def zstd3_compress_custom(ser_fn: Callable[[T], bytes]) -> Callable[[T], bytes]:
@functools.wraps(ser_fn)
def wrapper(x):
global container
init_container(container)
return container.zstd_3_compressor.compress(ser_fn(x))
return wrapper
[docs]def zstd6_compress_custom(ser_fn: Callable[[T], bytes]) -> Callable[[T], bytes]:
@functools.wraps(ser_fn)
def wrapper(x):
global container
init_container(container)
return container.zstd_6_compressor.compress(ser_fn(x))
return wrapper
[docs]def zstd_decompress_custom(deser_fn: Callable[[bytes], T]) -> Callable[[bytes], T]:
@functools.wraps(deser_fn)
def wrapper(x):
global container
init_container(container)
return deser_fn(container.zstd_decompressor.decompress(x))
return wrapper
[docs]class Chain2:
def __init__(self, g, f):
self.g = g
self.f = f
def __call__(self, args):
return self.g(self.f(args))