Сериализация данных в python
RichApp
Опубликована 15 апреля 2020 в 14:02:41

Сериализация данных в python

python3jsoncsvprotobufbenchmark
Сериализация данных в json, csv, protobuf.

Процесс перевода объектов в последовательность байт называется сериализацией. Процесс перевода последовательности байт в объект называется десериализацией.

В статье мы рассмотрим следующие форматы:

Json, csv - текстовые форматы. Protobuf, pickle - бинарные. Json, csv, protobuf распространены во всех языках программирования. Pickle работает только в python.

В статье мы замерим время сериализации и десериализации данных в эти форматы. Сравним размер сериализованных данных в этих форматах. Рассмотрим удобство их использования.

Ссылка на видео ролик: https://youtu.be/9HDZvx6ZvZ8.

Исходный код: https://github.com/richapp-ru/blog-2.

Функция для генерации данных

Напишем функцию, которая генерирует массив данных. Элемент этого массива будет dict с полями:

  1. id: integer - уникальный идентификатор записи;
  2. type: enum - тип записи;
  3. amount: integer - сумма;
  4. time: float - timestamp записи;
  5. message: string - сообщение.

Функция принимает на вход параметры:

  1. size: integer - размер генерируемого массива данных;
  2. startId: integer - стартовое значение поля id с значением по умолчанию.

generate_data.py

from time import time
from random import choice as randchoice
from random import randrange

START_ID = 10000000
TYPES = ["перевод", "снятие", "пополнение"]
START_AMOUNT = 3000
END_AMOUNT = 1000000
STEP = 1000


def generateData(size, startId=START_ID):
    data = []
    for i in range(size):
        data.append({
            "id": START_ID + i,
            "type": randchoice(TYPES),
            "amount": randrange(START_AMOUNT, END_AMOUNT, STEP),
            "time": time(),
            "message": "Transaction #{}".format(i)
        })
    return data

Функция для оценки скорости сериализации

Функция измеряет время сериализации массива данных. Среднее время сериализации и размер сериализованных данных функция записывает в singleton Result. Затем сериализованные данные записываются в файл results/serialized-<serializeType>. Текствые форматы сериализции возвращают текст, бинарные - байты. Поэтому в функции есть проверка на тип переменной serializedData.

Функция принимает на вход параметры:

  1. iterations: integer - количество итераций для подсчёта среднего времени;
  2. serializeType: enum - формат сериализации;
  3. data: array of dict - массив данных для сериализации.

Функции сериализации на вход принимают массив данных, а возвращают сериализованные данные в строке или байтах.

Функции сериализации в json и pickle стандартной библиотеки имеют интерфейс описанный выше. Функция сериализации в json сторонней библиотеки ujson тоже имеет такой интерфейс.

Функции сериализации в csv и protobuf имеют другой интерфейс. Функции сериализации в данные форматы будут описаны в соответствующих разделах ниже.

serialize_benchmark.py

import os
from time import time
import json
import pickle
import ujson
import progressbar

from result import Result
from serialize_types import SerializeTypes
from csv_format import csvSerialize, csvDictSerialize
from protobuf_format import protobufSerialize

serializeFunctions = {}
serializeFunctions[SerializeTypes.json] = json.dumps
serializeFunctions[SerializeTypes.ujson] = ujson.dumps
serializeFunctions[SerializeTypes.csv] = csvSerialize
serializeFunctions[SerializeTypes.csvDict] = csvDictSerialize
serializeFunctions[SerializeTypes.protobuf] = protobufSerialize
serializeFunctions[SerializeTypes.pickle] = pickle.dumps


def serializeBenchmark(iterations, serializeType, data):
    fn = serializeFunctions[serializeType]
    totalTime = float(0)
    print("[{}] Start serialize bechmark".format(serializeType))
    for i in progressbar.progressbar(range(iterations)):
        start = time()
        serializedData = fn(data)
        totalTime += time() - start


    Result.addSerializeBenchmark(
        serializeType,
        totalTime / float(iterations),
        len(serializedData)
    )

    flags = "w"
    if type(serializedData) == bytes:
        flags = "wb"
    with open(os.path.join(Result.RESULTS_DIR, "serialized-{}".format(serializeType)), flags) as fstream:
        fstream.write(serializedData)

    return serializedData

Функция для оценки скорости десериализации

Функция измеряет время сериализации массива данных. Среднее время десериализации записывает в singleton Result. Затем десериализованные данные записываются в файл results/deserialized-<serializeType>.csv в формате csv.

Результирующие csv файлы можно сравнить, чтобы убедиться что все функции десериализации возвращают одинаковые значения.

Функция принимает на вход параметры:

  1. iterations: integer - количество итераций для подсчёта среднего времени;
  2. serializeType: enum - формат сериализации;
  3. serializedData: string of bytes - сериализованные данные в формате serializeType.

Функции десериализации на вход принимают сериализованные данные в виде строки или массива байтов, а возвращают массив данных. Для бинарных форматов на вход байты, текстовых — строка.

Функции десериализации в json и pickle стандартной библиотеки имеют интерфейс описанный выше. Функция десериализации в json сторонней библиотеки ujson тоже имеет такой интерфейс.

Функции десериализации в csv и protobuf имеют другой интерфейс. Функции десериализации в данные форматы будут описаны в соответствующих разделах ниже.

deserialize_benchmark.py

import os
from time import time
import json
import pickle
import ujson
import progressbar

from result import Result
from helpers import sizePretty
from serialize_types import SerializeTypes
from csv_format import csvSerialize, csvDeserialize, csvDictDeserialize
from protobuf_format import protobufDeserialize

deserializeFunctions = {}
deserializeFunctions[SerializeTypes.json] = json.loads
deserializeFunctions[SerializeTypes.ujson] = ujson.loads
deserializeFunctions[SerializeTypes.csv] = csvDeserialize
deserializeFunctions[SerializeTypes.csvDict] = csvDictDeserialize
deserializeFunctions[SerializeTypes.protobuf] = protobufDeserialize
deserializeFunctions[SerializeTypes.pickle] = pickle.loads


def deserializeBenchmark(iterations, serializeType, serializedData):
    fn = deserializeFunctions[serializeType]
    totalTime = float(0)
    print("[{}] Start deserialize benchmark".format(serializeType))
    for i in progressbar.progressbar(range(iterations)):
        start = time()
        data = fn(serializedData)
        totalTime += time() - start

    Result.addDeserializeBenchmark(
        serializeType,
        totalTime / float(iterations)
    )
    with open(os.path.join(Result.RESULTS_DIR, "deserialized-{}.csv".format(serializeType)), "w") as fstream:
        fstream.write(csvSerialize(data))

    return data

Функции сериализации и десериализации в формат CSV

Функции сериализации в csv стандартной библиотеки на вход принимают stream, потому что csv подразумевает работу с файлами. Стандартными средствами python можно создать stream в памяти, что и будет в функции ниже.

Ниже описаны два способа работы с библиотекой csv. Первый — использование Reader и Writer, запись храниться в виде массива. Второй — использование DictReader и DictWriter, запись храниться в виде словаря. Первый способ быстрее, второй — удобнее.

csv_format.py

import io
import csv


def csvSerialize(data):
    stream = io.StringIO()
    writer = csv.writer(stream)
    writer.writerow(["id", "type", "amount", "time", "message"])
    for item in data:
        writer.writerow([
            item["id"],
            item["type"],
            item["amount"],
            item["time"],
            item["message"],
        ])
    return stream.getvalue()


def csvDeserialize(serializedData):
    stream = io.StringIO(serializedData)
    reader = csv.reader(stream)
    next(reader)

    result = []
    for item in reader:
        result.append({
            "id": int(item[0]),
            "type": item[1],
            "amount": int(item[2]),
            "time": float(item[3]),
            "message": item[4]
        })
    return result


def csvDictSerialize(data):
    stream = io.StringIO()
    writer = csv.DictWriter(stream, fieldnames=["id", "type", "amount", "time", "message"])
    writer.writeheader()
    for item in data:
        writer.writerow(item)
    return stream.getvalue()


def csvDictDeserialize(serializedData):
    stream = io.StringIO(serializedData)
    reader = csv.DictReader(stream)

    result = []
    for item in reader:
        item["id"] = int(item["id"])
        item["amount"] = int(item["amount"])
        item["time"] = float(item["time"])
        result.append(item)
    return result

Функции сериализации и десериализации в формат protobuf

Для сериализации в формат protobuf надо объявить формат сериализуемых данных в файле proto. Затем из этого файла компилатором protoc генерируются функции сериализации и десериализации.

В файле proto объявим две структуры message. Первая - Transaction, элемент из массива данных. Вторая - Data, массив элементов Transaction.

protobuf_format/data.proto

syntax = "proto3";

message Transaction {
    int32 id = 1;
    enum Type {
        transfer = 0;
        withdraw = 1;
        fillup = 2;
    }
    Type type = 2;
    int32 amount = 3;
    double time = 4;
    string message = 5;
}

message Data {
    repeated Transaction transactions = 1;
}

Скрипт генерации файла protobuf_format/data_pb2.py из файла protobuf_format/data.proto.

protobuf_format/generate.sh

#!/usr/bin/env bash
curDir=$(cd $(dirname ${BASH_SOURCE}) && pwd)
protoc --proto_path=${curDir} --python_out=${curDir} ${curDir}/data.proto

Функция сериализации в protobuf принимает на вход сгенерированную protobuf структуру Data. Функция десериализации возвращает protobuf структуру Data. Ниже объявлены две функции сериализации и десериализации, которые принимают и отдают массив данных в формате функций оценки производительности.

protobuf_format/protobuf_format.py

from protobuf_format.data_pb2 import Data, Transaction

typeToProto = {
    "перевод": "transfer",
    "снятие": "withdraw",
    "пополнение": "fillup",
}
typeFromProto = {v: k for k, v in typeToProto.items()}


def protobufSerialize(data):
    protoData = Data()
    for item in data:
        protoData.transactions.append(Transaction(
            id=item["id"],
            type=typeToProto[item["type"]],
            amount=item["amount"],
            time=item["time"],
            message=item["message"]
        ))
    return protoData.SerializeToString()


def protobufDeserialize(serializedData):
    data = []

    protoData = Data()
    protoData.ParseFromString(serializedData)
    for transaction in protoData.transactions:
        data.append({
            "id": transaction.id,
            "type": typeFromProto[Transaction.Type.Name(transaction.type)],
            "amount": transaction.amount,
            "time": transaction.time,
            "message": transaction.message,
        })
    return data

Срипт для запуска оценки производительности

Параметры, который скрипт принимает на вход:

  1. --iteration: integer - количество итераций для подсчёта среднего времени в функциях сериализации и десериализации (по умолчанию 100);
  2. --data-size: integer - количество данных, которое будет сгенерировано функцией generateData.

Скрипт по очереди запускает serializeBenchmark и deserializeBenchmark для форматов сериализации, описанных в файле serialize_types.py. В конце скрипт выводит отчет с оценкой среднего времени выполнения функции за количество итераций --iteration.

main.py

#!/usr/bin/env python3
import os
import sys
import argparse

CUR_DIR = os.path.abspath(os.path.dirname(__file__))

sys.path.append(CUR_DIR)
from result import Result
from serialize_types import TYPES_LIST
from csv_format import csvSerialize
from generate_data import generateData
from serialize_benchmark import serializeBenchmark
from deserialize_benchmark import deserializeBenchmark


def main():
    parser = argparse.ArgumentParser("")

    parser.add_argument("--iterations", dest="iterations", type=int, default=100)
    parser.add_argument("--data-size", dest="dataSize", type=int, required=True)

    args = parser.parse_args()
    data = generateData(args.dataSize)

    Result.init()

    for serializeType in TYPES_LIST:
        serializedData = serializeBenchmark(args.iterations, serializeType, data)
        deserializedData = deserializeBenchmark(args.iterations, serializeType, serializedData)

    Result.report()

if __name__ == "__main__":
    main()

serialize_types.py

class SerializeTypes(object):
    json = "json"
    ujson = "ujson"
    csv = "csv"
    csvDict = "csvDict"
    protobuf = "protobuf"
    pickle = "pickle"

TYPES_LIST = [
    SerializeTypes.json,
    SerializeTypes.ujson,
    SerializeTypes.csv,
    SerializeTypes.csvDict,
    SerializeTypes.protobuf,
    SerializeTypes.pickle
]

result.py

import os
import shutil
from helpers import sizePretty
from serialize_types import TYPES_LIST


class Result(object):

    RESULTS_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "results")
    MAX_TYPE_LEN = max([len(item) for item in TYPES_LIST])
    MAX_TIME_LEN = 0
    MAX_SIZE_LEN = 0
    PERCENTAGE = 10
    _serializeBenchmarks = []
    _deserializeBenchmarks = []

    @classmethod
    def _printLine(cls):
        print("+-{}-+-{}-+-{}-+-{}-+".format(
            "-".ljust(cls.MAX_TYPE_LEN, "-"),
            "-".ljust(cls.MAX_TIME_LEN, "-"),
            "-".ljust(cls.MAX_SIZE_LEN, "-"),
            "-".ljust(cls.PERCENTAGE, "-"),
        ))

    @classmethod
    def _printHeader(cls, title, fields):
        print("")
        print(title)
        cls._printLine()
        print("| {} | {} | {} | {} |".format(
            fields[0].ljust(cls.MAX_TYPE_LEN),
            fields[1].ljust(cls.MAX_TIME_LEN),
            fields[2].ljust(cls.MAX_SIZE_LEN),
            fields[3].ljust(cls.PERCENTAGE)
        ))
        cls._printLine()

    @classmethod
    def init(cls):
        shutil.rmtree(cls.RESULTS_DIR, ignore_errors=True)
        os.mkdir(cls.RESULTS_DIR)

    @classmethod
    def addSerializeBenchmark(cls, serializeType, time, size):
        cls._serializeBenchmarks.append({
            "type": serializeType,
            "time": time,
            "formatedTime": "{:.6f}".format(time),
            "size": size,
            "formatedSize": sizePretty(size),
        })

    @classmethod
    def addDeserializeBenchmark(cls, serializeType, time):
        cls._deserializeBenchmarks.append({
            "type": serializeType,
            "time": time,
            "formatedTime": "{:.6f}".format(time),
        })

    @classmethod
    def report(cls):
        cls.minSerializeTime = cls._serializeBenchmarks[0]["time"]
        for item in cls._serializeBenchmarks:
            if len(item["formatedTime"]) > cls.MAX_TIME_LEN:
                cls.MAX_TIME_LEN = len(item["formatedTime"])
            if len(item["formatedSize"]) > cls.MAX_SIZE_LEN:
                cls.MAX_SIZE_LEN = len(item["formatedSize"])
            if item["time"] < cls.minSerializeTime:
                cls.minSerializeTime = item["time"]

        cls.minDeserializeTime = cls._deserializeBenchmarks[0]["time"]
        for item in cls._deserializeBenchmarks:
            if len(item["formatedTime"]) > cls.MAX_TIME_LEN:
                cls.MAX_TIME_LEN = len(item["formatedTime"])
            if item["time"] < cls.minDeserializeTime:
                cls.minDeserializeTime = item["time"]

        cls._printHeader("Serialize report:", ["Type", "Time", "Size", "Percentage"])
        for bench in cls._serializeBenchmarks:
            bench["percentage"] = str(int(bench["time"] / cls.minSerializeTime * 100)) + "%"
            print("| {} | {} | {} | {} |".format(
                bench["type"].ljust(cls.MAX_TYPE_LEN),
                bench["formatedTime"].ljust(cls.MAX_TIME_LEN),
                bench["formatedSize"].ljust(cls.MAX_SIZE_LEN),
                bench["percentage"].ljust(cls.PERCENTAGE),
            ))
        cls._printLine()

        cls._printHeader("Deserialize report:", ["Type", "Time", "", "Percentage"])
        for bench in cls._deserializeBenchmarks:
            bench["percentage"] = str(int(bench["time"] / cls.minDeserializeTime * 100)) + "%"
            print("| {} | {} | {} | {} |".format(
                bench["type"].ljust(cls.MAX_TYPE_LEN),
                bench["formatedTime"].ljust(cls.MAX_TIME_LEN),
                "".ljust(cls.MAX_SIZE_LEN),
                bench["percentage"].ljust(cls.PERCENTAGE),
            ))
        cls._printLine()

helpers.py

sizeMetrics = [
    "B",
    "KB",
    "MB",
    "GB",
    "TB",
]


def sizePretty(size, index=0):
    if size == 0:
        return "0"

    if index == len(sizeMetrics) - 1 or float(size) / float(1024) < 1:
        return "{:.2f} {}".format(size, sizeMetrics[index])
    else:
        return sizePretty(float(size) / float(1024), index+1)

Результат оценки производительности

./main.py --data-size 10000 --iteration 100000

Serialize report:
+----------+-------------+-----------+------------+
| Type     | Median Time | Size      | Percentage |
+----------+-------------+-----------+------------+
| json     | 0.021359    | 1.46 MB   | 550%       |
| ujson    | 0.009171    | 1.37 MB   | 236%       |
| csv      | 0.022120    | 607.39 KB | 570%       |
| csvDict  | 0.044757    | 607.39 KB | 1154%      |
| protobuf | 0.037837    | 392.58 KB | 975%       |
| pickle   | 0.003877    | 641.64 KB | 100%       |
+----------+-------------+-----------+------------+

Deserialize report:
+----------+-------------+-----------+------------+
| Type     | Median Time |           | Percentage |
+----------+-------------+-----------+------------+
| json     | 0.018542    |           | 339%       |
| ujson    | 0.011978    |           | 219%       |
| csv      | 0.020979    |           | 383%       |
| csvDict  | 0.051998    |           | 951%       |
| protobuf | 0.024407    |           | 446%       |
| pickle   | 0.005467    |           | 100%       |
+----------+-------------+-----------+------------+

Процессор, на котором проводилась оценка:

CPU(s):         4
CPU family:     6
Model:          60
Model name:     Intel(R) Core(TM) i5-4440 CPU @ 3.10GHz
Stepping:       3
CPU MHz:        3274.683
CPU max MHz:    3300,0000
CPU min MHz:    800,0000
BogoMIPS:       6196.63
Virtualization: VT-x
L1d cache:      32K
L1i cache:      32K
L2 cache:       256K
L3 cache:       6144K

Вывод

В python самый быстрый формат сериализации и десериализации - pickle. Формат pickle поддерживается только в python.

Следом по скорости идёт ujson и json. Формат json поддерживается всеми языками программирования.

В python у protobuf низкая скорость сериализации и десериализации. Размер сообщения в protobuf наименьший. Этот формат поддерживает много языков программирования.

csv.DictReader удобно использовать, но если нужная производительность, то лучше использовать простой csv.Reader. Формат csv поддерживается всеми языками программрования.

Замечание

В формат protobuf встроенна возможность валидации значений. Для валидации json можно использовать описание в json-schema. Скорее всего protobuf с валидацией выиграет по производительности json с валидацией по json-schema. Но это уже другая оценка производительности.