historian-data-compression 0.0.14

Creator: bradpython12

Last updated:

Add to Cart


historiandatacompression 0.0.14

Historian Data Compression
Historian Data Compression is a Python library used to compress historian data, using the deadband and/or swinging door algorithm.
Historian data are typically 2 dataframe columns with a timestamp and a logged value.
Project description
Based on the swinging door library of Aleksandr F. Mikhaylov (ChelAxe).
The default for the extra timeout parameter is 0, which actually means 'no timeout'.
The swinging door algorithm is clearly explained in this presentation,
and in this file.
Use the package manager pip to install historian_data_compression.
pip install historian_data_compression

To avoid timestamp issues:

sort the dateframe by timestamp,
and convert negative timestamps (in Windows, dates before 1970-01-01) by adding a number of seconds before the compression, and deducting again afterwards.

Simple demo (dataframe with 1 significant value column)
import pandas as pd
from datetime import datetime, timedelta
from historian_data_compression import point_generator, dead_band_compression, swinging_door_compression

df = pd.read_csv(r"https://datahub.io/core/natural-gas/r/daily.csv")
df["Date"] = pd.to_datetime(df["Date"], format="%Y-%m-%d %H:%M:%S")

df = df.sort_values("Date")
first_ts = df["Date"].min().timestamp()
if first_ts < 0:
df["Date"] = df["Date"] + timedelta(seconds=int(first_ts))

max = df["Price"].max()
min = df["Price"].min()
dbc_deadband_perc = 0.5 / 100 # typically 0.5 %
dbc_deviation = dbc_deadband_perc * (max - min) / 2 # deviation = deadband / 2
dbc_timeout = 0 # seconds, but 0 equals 'no timeout'
swdc_deadband_perc = 1 / 100 # typically 1.0 %
swdc_deviation = swdc_deadband_perc * (max - min) / 2
swdc_timeout = 0 # seconds, but 0 equals 'no timeout'

df_dbc = pd.DataFrame(
"Date": datetime.fromtimestamp(ts),
"Price": value
for ts, value in dead_band_compression(
point_generator(df[["Date", "Price"]]), deviation=dbc_deviation, timeout=dbc_timeout
df_dbc_swdc = pd.DataFrame(
"Date": datetime.fromtimestamp(ts),
"Price": value
for ts, value in swinging_door_compression(
point_generator(df_dbc), deviation=swdc_deviation, timeout=swdc_timeout
if first_ts < 0:
df_dbc["Date"] = df_dbc["Date"] - timedelta(seconds=int(first_ts))
df_dbc_swdc["Date"] = df_dbc_swdc["Date"] - timedelta(seconds=int(first_ts))
"Size after 1st stage compression (deadband only): "
f"{len(df_dbc) / len(df):>10.1%}"
"Size after 2nd stage compression (deadband + swinging door):"
f"{len(df_dbc_swdc) / len(df):>10.1%}"

# returns:

Size after 1st stage compression (deadband only): 84.7%
Size after 2nd stage compression (deadband + swinging door): 26.8%

Example with dataframe with multiple significant value columns
import pandas as pd
from datetime import datetime
from historian_data_compression import point_generator, swinging_door_compression

df = pd.read_csv(r"https://datahub.io/core/global-temp/r/monthly.csv")
df = pd.pivot(df, index=["Date"], columns=["Source"], values=["Mean"])
df = df.reset_index(drop=False)
df.columns = [c[1] if c[0] == "Mean" else "Date" for c in df.columns ]
df["Date"] = pd.to_datetime(df["Date"], format="%Y-%m-%d %H:%M:%S")
cols_float = [c for c in df.columns if df[c].dtype == "float"]
df = df.sort_values("Date")
days = (datetime(1970, 1, 1) - df.loc[0, "Date"]).total_seconds() / (60 * 60 * 24)
if days > 0:
days = int(days) + 100
days = 0
df["Date"] = df["Date"] + pd.Timedelta(days=days)

ix = pd.date_range(start=df["Date"].min(), end=df["Date"].max(), freq='D')
df1 = df.set_index('Date')
df1 = df1.reindex(ix).reset_index(drop=False)
df1.columns = ["Date"] + cols_float

tol = pd.Timedelta("0.5 days")

for col in cols_float:
max = df[col].max()
min = df[col].min()
swdc_deadband_perc = 5 / 100 # typically 1.0 %
swdc_deviation = swdc_deadband_perc * (max - min) / 2
swdc_timeout = 0 # seconds, but 0 eauals 'no timeout'

df_swdc = pd.DataFrame(
"Date": datetime.fromtimestamp(ts),
col: value
for ts, value in swinging_door_compression(
point_generator(df[["Date", col]]), deviation=swdc_deviation, timeout=swdc_timeout
df1 = pd.merge_asof(df1, df_swdc, on="Date", direction="nearest", tolerance=tol, suffixes=["", "_compressed"])
if days > 0:
df1["Date"] = df1["Date"] - pd.Timedelta(days=days)

df_swdc = df1.dropna(thresh=2).reset_index(drop=True)

df_swdc.plot(x="Date", y="GISTEMP")
df_swdc.plot(x="Date", y="GISTEMP_compressed")

"Size after swinging door compression: "
f'{df_swdc["GISTEMP_compressed"].count() / df_swdc["GISTEMP"].count():>10.1%}'

# returns:

Size after swinging door compression: 39.9%



For personal and professional use. You cannot resell or redistribute these repositories in their original state.

Customer Reviews

There are no reviews.