Util Functions for Data Engineering in Computer Vision


File Handler


from abc import ABCMeta, abstractmethod

class BaseFileHandler(object):

    __metaclass__ = ABCMeta  # python 2 compatibility

    def load_from_fileobj(self, file, **kwargs):

    def dump_to_fileobj(self, obj, file, **kwargs):

    def dump_to_str(self, obj, **kwargs):

    def load_from_path(self, filepath, mode='r', **kwargs):
        with open(filepath, mode) as f:
            return self.load_from_fileobj(f, **kwargs)

    def dump_to_path(self, obj, filepath, mode='w', **kwargs):
        with open(filepath, mode) as f:
            self.dump_to_fileobj(obj, f, **kwargs)

Load from and dump into .json file in Python

import json

from .base import BaseFileHandler

class JsonHandler(BaseFileHandler):

    def load_from_fileobj(self, file):
        return json.load(file)

    def dump_to_fileobj(self, obj, file, **kwargs):
        json.dump(obj, file, **kwargs)

    def dump_to_str(self, obj, **kwargs):
        return json.dumps(obj, **kwargs)

Load from and dump into .pkl file in Python

from six.moves import cPickle as pickle

from .base import BaseFileHandler

class PickleHandler(BaseFileHandler):

    def load_from_fileobj(self, file, **kwargs):
        return pickle.load(file, **kwargs)

    def load_from_path(self, filepath, **kwargs):
        return super(PickleHandler, self).load_from_path(
            filepath, mode='rb', **kwargs)

    def dump_to_str(self, obj, **kwargs):
        kwargs.setdefault('protocol', 2)
        return pickle.dumps(obj, **kwargs)

    def dump_to_fileobj(self, obj, file, **kwargs):
        kwargs.setdefault('protocol', 2)
        pickle.dump(obj, file, **kwargs)

    def dump_to_path(self, obj, filepath, **kwargs):
        super(PickleHandler, self).dump_to_path(
            obj, filepath, mode='wb', **kwargs)

Load from and dump into .yml file in Python with YAML package

import yaml

    from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
    from yaml import Loader, Dumper

from .base import BaseFileHandler  # isort:skip

class YamlHandler(BaseFileHandler):

    def load_from_fileobj(self, file, **kwargs):
        kwargs.setdefault('Loader', Loader)
        return yaml.load(file, **kwargs)

    def dump_to_fileobj(self, obj, file, **kwargs):
        kwargs.setdefault('Dumper', Dumper)
        yaml.dump(obj, file, **kwargs)

    def dump_to_str(self, obj, **kwargs):
        kwargs.setdefault('Dumper', Dumper)
        return yaml.dump(obj, **kwargs)

Load list or dict from Text File

Load python list from text file:

Args: filename (str): Filename. prefix (str): The prefix to be inserted to the begining of each item. offset (int): The offset of lines. max_num (int): The maximum number of lines to be read, zeros and negatives mean no limitation.

Returns: list[str]: A list of strings.

def list_from_file(filename, prefix='', offset=0, max_num=0):
    """Load a text file and parse the content as a list of strings.

    cnt = 0
    item_list = []
    with open(filename, 'r') as f:
        for _ in range(offset):
        for line in f:
            if max_num > 0 and cnt >= max_num:
            item_list.append(prefix + line.rstrip('\n'))
            cnt += 1
    return item_list

Slice list in python

import itertools

def slice_list(in_list, lens):
    """Slice a list into several sub lists by a list of given length.

        in_list (list): The list to be sliced.
        lens(int or list): The expected length of each out list.

        list: A list of sliced list.
    if not isinstance(lens, list):
        raise TypeError('"indices" must be a list of integers')
    elif sum(lens) != len(in_list):
        raise ValueError(
            'sum of lens and list length does not match: {} != {}'.format(
                sum(lens), len(in_list)))
    out_list = []
    idx = 0
    for i in range(len(lens)):
        out_list.append(in_list[idx:idx + lens[i]])
        idx += lens[i]
    return out_list

Concat list in python

def concat_list(in_list):
    """Concatenate a list of list into a single list.

        in_list (list): The list of list to be merged.

        list: The concatenated flat list.
    return list(itertools.chain(*in_list))

Load python dict from text file

Each line of the text file will be two or more columns splited by whitespaces or tabs. The first column will be parsed as dict keys, and the following columns will be parsed as dict values.

Args: filename(str): Filename. key_type(type): Type of the dict’s keys. str is user by default and type conversion will be performed if specified.

Returns: dict: The parsed contents.

def dict_from_file(filename, key_type=str):
    """Load a text file and parse the content as a dict.
    mapping = {}
    with open(filename, 'r') as f:
        for line in f:
            items = line.rstrip('\n').split()
            assert len(items) >= 2
            key = key_type(items[0])
            val = items[1:] if len(items) > 2 else items[1]
            mapping[key] = val
    return mapping

Path Handling

import os
import os.path as osp
import sys
from pathlib import Path

import six

from .misc import is_str

if sys.version_info <= (3, 3):
    FileNotFoundError = IOError
    FileNotFoundError = FileNotFoundError

def is_filepath(x):
    if is_str(x) or isinstance(x, Path):
        return True
        return False

def fopen(filepath, *args, **kwargs):
    if is_str(filepath):
        return open(filepath, *args, **kwargs)
    elif isinstance(filepath, Path):
        return filepath.open(*args, **kwargs)

def check_file_exist(filename, msg_tmpl='file "{}" does not exist'):
    if not osp.isfile(filename):
        raise FileNotFoundError(msg_tmpl.format(filename))

def mkdir_or_exist(dir_name, mode=0o777):
    if dir_name == '':
    dir_name = osp.expanduser(dir_name)
    if six.PY3:
        os.makedirs(dir_name, mode=mode, exist_ok=True)
        if not osp.isdir(dir_name):
            os.makedirs(dir_name, mode=mode)

def symlink(src, dst, overwrite=True, **kwargs):
    if os.path.lexists(dst) and overwrite:
    os.symlink(src, dst, **kwargs)

def _scandir_py35(dir_path, suffix=None):
    for entry in os.scandir(dir_path):
        if not entry.is_file():
        filename = entry.name
        if suffix is None:
            yield filename
        elif filename.endswith(suffix):
            yield filename

def _scandir_py(dir_path, suffix=None):
    for filename in os.listdir(dir_path):
        if not osp.isfile(osp.join(dir_path, filename)):
        if suffix is None:
            yield filename
        elif filename.endswith(suffix):
            yield filename

def scandir(dir_path, suffix=None):
    if suffix is not None and not isinstance(suffix, (str, tuple)):
        raise TypeError('"suffix" must be a string or tuple of strings')
    if sys.version_info >= (3, 5):
        return _scandir_py35(dir_path, suffix)
        return _scandir_py(dir_path, suffix)

def find_vcs_root(path, markers=('.git', )):
    """Finds the root directory (including itself) of specified markers.

        path (str): Path of directory or file.
        markers (list[str], optional): List of file or directory names.

        The directory contained one of the markers or None if not found.
    if osp.isfile(path):
        path = osp.dirname(path)

    prev, cur = None, osp.abspath(osp.expanduser(path))
    while cur != prev:
        if any(osp.exists(osp.join(cur, marker)) for marker in markers):
            return cur
        prev, cur = cur, osp.split(cur)[0]
    return None

Image IO with OpenCV


import os.path as osp

import cv2
import numpy as np


imread_flags = {
    'color': IMREAD_COLOR,
    'grayscale': IMREAD_GRAYSCALE,
    'unchanged': IMREAD_UNCHANGED

Read image from path or bytes with flag

def imread(img_or_path, flag='color'):
    """Read an image.

        img_or_path (ndarray or str): Either a numpy array or image path.
            If it is a numpy array (loaded image), then it will be returned
            as is.
        flag (str): Flags specifying the color type of a loaded image,
            candidates are `color`, `grayscale` and `unchanged`.

        ndarray: Loaded image array.
    if isinstance(img_or_path, np.ndarray):
        return img_or_path
    elif is_str(img_or_path):
        flag = imread_flags[flag] if is_str(flag) else flag
                         'img file does not exist: {}'.format(img_or_path))
        return cv2.imread(img_or_path, flag)
        raise TypeError('"img" must be a numpy array or a filename')

def imfrombytes(content, flag='color'):
    """Read an image from bytes.

        content (bytes): Image bytes got from files or other streams.
        flag (str): Same as :func:`imread`.

        ndarray: Loaded image array.
    img_np = np.frombuffer(content, np.uint8)
    flag = imread_flags[flag] if is_str(flag) else flag
    img = cv2.imdecode(img_np, flag)
    return img

Write image with auto creating directory option

def imwrite(img, file_path, params=None, auto_mkdir=True):
    """Write image to file

        img (ndarray): Image array to be written.
        file_path (str): Image file path.
        params (None or list): Same as opencv's :func:`imwrite` interface.
        auto_mkdir (bool): If the parent folder of `file_path` does not exist,
            whether to create it automatically.

        bool: Successful or not.
    if auto_mkdir:
        dir_name = osp.abspath(osp.dirname(file_path))
    return cv2.imwrite(file_path, img, params)

Alternative Util functions for list, json, data I/O

import json
import pickle
import os
import shutil

#io functions of SCRC
def load_str_list(filename, end = '\n'):
    with open(filename, 'r') as f:
        str_list = f.readlines()
    str_list = [s[:-len(end)] for s in str_list]
    return str_list

def save_str_list(str_list, filename, end = '\n'):
    str_list = [s+end for s in str_list]
    with open(filename, 'w') as f:

def load_json(filename):
    with open(filename, 'r') as f:
        return json.load(f)

def save_json(json_obj, filename):
    with open(filename, 'w') as f:
        # json.dump(json_obj, f, separators=(',\n', ':\n'))
        json.dump(json_obj, f, indent = 0, separators = (',', ': '))

def mkdir_if_missing(output_dir):
  def mkdir_if_missing(output_dir)
  if not os.path.exists(output_dir):
def save_data(data, filename):
    with open(filename, 'wb') as f:
        pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)

def load_data(filename):
    with open(filename, 'rb') as f:
        data = pickle.load(f)
    return data

def copy(fn_src, fn_tar):
    shutil.copyfile(fn_src, fn_tar)

Code from mmcv