# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import (
bytes, dict, int, list, object, range, str, ascii, chr, hex, input, next,
oct, open, pow, round, super, filter, map, zip
)
import numpy as np
import pkg_resources
import datetime
def lorenz_euler(length, sigma, rho, beta, dt=0.01, start=[1,1,1]):
"""
Simulates the Lorenz system using a simple Euler method
The Lorenz system is a three dimensional dynamical system given
by the following equations:
dx/dt = sigma * (y - x)
dy/dt = rho * x - y - x * z
dz/dt = x * y - beta * z
"""
def lorenz(state, sigma, rho, beta):
x, y, z = state
# NOTE: Numpy 1.x stores intermediate results as float64
# => to achieve consistency between numpy versions, we have to use
# float32 for all values that enter the formula to simulate numpy 1.x
# behavior with numpy 2.x.
return np.array([
np.float32(sigma) * (y - x),
np.float32(rho) * x - y - x * z,
x * y - np.float32(beta) * z
], dtype="float32")
trajectory = np.zeros((length, 3), dtype="float32")
trajectory[0] = start
for i in range(1, length):
# t = i * dt
trajectory[i] = trajectory[i-1] + lorenz(trajectory[i-1], sigma, rho, beta) * dt
return trajectory
def lorenz_lyap(sigma, rho, beta):
"""
Calculates the exact Lyapunov dimension of the Lorenz system according to
Leonov 2015 [ll_1]_.
References:
.. [ll_1] G. A. Leonov and N. V. Kuznetsov, “On differences and similarities in the
analysis of Lorenz, Chen, and Lu systems,” Applied Mathematics and Computation,
vol. 256, pp. 334–343, Apr. 2015, doi: 10.1016/j.amc.2014.12.132.
"""
return 3 - 2 * (sigma + beta + 1) / (sigma + 1 + np.sqrt((sigma-1) ** 2 + 4 * sigma * rho))
[docs]
def fbm(n, H=0.75):
"""
Generates fractional brownian motions of desired length.
Author:
Christian Thomae
References:
.. [fbm_1] https://en.wikipedia.org/wiki/Fractional_Brownian_motion#Method_1_of_simulation
Args:
n (int):
length of sequence to generate
Kwargs:
H (float):
hurst parameter
Returns:
array of float:
simulated fractional brownian motion
"""
# TODO more detailed description of fbm
assert H > 0 and H < 1
def R(t, s):
twoH = 2 * H
return 0.5 * (s**twoH + t**twoH - np.abs(t - s)**twoH)
# form the matrix tau
gamma = R(*np.mgrid[0:n, 0:n]) # apply R to every element in matrix
w, P = np.linalg.eigh(gamma)
L = np.diag(w)
sigma = np.dot(np.dot(P, np.sqrt(L)), np.linalg.inv(P))
v = np.random.randn(n)
return np.dot(sigma, v)
[docs]
def fgn(n, H=0.75):
"""
Generates fractional gaussian noise of desired length.
References:
.. [fgn_1] https://en.wikipedia.org/wiki/Fractional_Brownian_motion
Args:
n (int):
length of sequence to generate
Kwargs:
H (float):
hurst parameter
Returns:
array of float:
simulated fractional gaussian noise
"""
return np.diff(fbm(n+1, H=H))
[docs]
def qrandom(n):
"""
Creates an array of n true random numbers obtained from the quantum random
number generator at qrng.anu.edu.au
This function requires the package quantumrandom and an internet connection.
Args:
n (int):
length of the random array
Return:
array of ints:
array of truly random unsigned 16 bit int values
"""
import quantumrandom
return np.concatenate([
quantumrandom.get_data(data_type='uint16', array_length=1024)
for i in range(int(np.ceil(n/1024.0)))
])[:n]
[docs]
def load_qrandom():
"""
Loads a set of 10000 random numbers generated by qrandom.
This dataset can be used when you want to do some limited tests with "true"
random data without an internet connection.
Returns:
int array
the dataset
"""
fname = "datasets/qrandom.npy"
with pkg_resources.resource_stream(__name__, fname) as f:
return np.load(f)
def load_brown72():
"""
Loads the dataset brown72 with a prescribed Hurst exponent of 0.72
Source: http://bearcave.com/misl/misl_tech/wavelets/hurst/index.html
Returns:
float array:
the dataset
"""
fname = "datasets/brown72.npy"
with pkg_resources.resource_stream(__name__, fname) as f:
return np.load(f)
def load_lorenz_physionet():
"""
Loads a dataset containing the X variable of the Lorenz system
as well as the output of PhysioNet's dfa implementation on that dataset.
The input data was created with the following code:
data = datasets.lorenz_euler(
3000, 10, 28, 8/3.0, start=[0.1,0.1,0.1], dt=0.012
)[1000:,0]
The ouptut from PhysioNet was created by calling:
dfa < lorenz.txt > lorenz_physionet.txt
Returns:
1d float array:
time series of the X variable of the Lorenz system that was used as input
2d float array:
x- and y-coordinates of the line fitting step in the PhysioNet output
"""
fname = "datasets/lorenz.txt"
with pkg_resources.resource_stream(__name__, fname) as f:
data_in = np.loadtxt(f)
fname = "datasets/lorenz_physionet.txt"
with pkg_resources.resource_stream(__name__, fname) as f:
data_out = np.loadtxt(f)
return data_in, data_out
[docs]
def tent_map(x, steps, mu=2):
"""
Generates a time series of the tent map.
Characteristics and Background:
The name of the tent map is derived from the fact that the plot of x_i vs
x_i+1 looks like a tent. For mu > 1 one application of the mapping function
can be viewed as stretching the surface on which the value is located and
then folding the area that is greater than one back towards the zero. This
corresponds nicely to the definition of chaos as expansion in one dimension
which is counteracted by a compression in another dimension.
Calculating the Lyapunov exponent:
The lyapunov exponent of the tent map can be easily calculated as due to
this stretching behavior a small difference delta between two neighboring
points will indeed grow exponentially by a factor of mu in each iteration.
We thus can assume that:
delta_n = delta_0 * mu^n
We now only have to change the basis to e to obtain the exact formula that
is used for the definition of the lyapunov exponent:
delta_n = delta_0 * e^(ln(mu) * n)
Therefore the lyapunov exponent of the tent map is:
lambda = ln(mu)
References:
.. [tm_1] https://en.wikipedia.org/wiki/Tent_map
Args:
x (float):
starting point
steps (int):
number of steps for which the generator should run
Kwargs:
mu (int):
parameter mu that controls the behavior of the map
Returns:
generator object:
the generator that creates the time series
"""
for _ in range(steps):
x = mu * x if x < 0.5 else mu * (1 - x)
yield x
# TODO should all math be formatted like this, or should the documentation of
# logistic_map revert to a version that is more readable as plain text
[docs]
def logistic_map(x, steps, r=4):
r"""
Generates a time series of the logistic map.
Characteristics and Background:
The logistic map is among the simplest examples for a time series that can
exhibit chaotic behavior depending on the parameter r. For r between 2 and
3, the series quickly becomes static. At r=3 the first bifurcation point is
reached after which the series starts to oscillate. Beginning with r = 3.6
it shows chaotic behavior with a few islands of stability until perfect
chaos is achieved at r = 4.
Calculating the Lyapunov exponent:
To calculate the "true" Lyapunov exponent of the logistic map, we first
have to make a few observations for maps in general that are repeated
applications of a function to a starting value.
If we have two starting values that differ by some infinitesimal
:math:`delta_0` then according to the definition of the lyapunov exponent
we will have an exponential divergence:
.. math::
|\delta_n| = |\delta_0| e^{\lambda n}
We can now write that:
.. math::
e^{\lambda n} = \lim_{\delta_0 -> 0} |\frac{\delta_n}{\delta_0}|
This is the definition of the derivative :math:`\frac{dx_n}{dx_0}` of a
point :math:`x_n` in the time series with respect to the starting point
:math:`x_0` (or rather the absolute value of that derivative). Now we can
use the fact that due to the definition of our map as repetitive
application of some f we have:
.. math::
f^{n\prime}(x) = f(f(f(...f(x_0)...))) = f'(x_n-1) \cdot f'(x_n-2)
\cdot ... \cdot f'(x_0)
with
.. math::
e^{\lambda n} = |f^{n\prime}(x)|
we now have
.. math::
e^{\lambda n} &= |f'(x_n-1) \cdot f'(x_n-2) \cdot ... \cdot f'(x_0)| \\
\Leftrightarrow \\
\lambda n &= \ln |f'(x_n-1) \cdot f'(x_n-2) \cdot ... \cdot f'(x_0)| \\
\Leftrightarrow \\
\lambda &= \frac{1}{n} \ln |f'(x_n-1) \cdot f'(x_n-2) \cdot ... \cdot f'(x_0)| \\
&= \frac{1}{n} \sum_{k=0}^{n-1} \ln |f'(x_k)|
With this sum we can now calculate the lyapunov exponent for any map.
For the logistic map we simply have to calculate :math:`f'(x)` and as we
have
.. math::
f(x) = r x (1-x) = rx - rx²
we now get
.. math::
f'(x) = r - 2 rx
References:
.. [lm_1] https://en.wikipedia.org/wiki/Tent_map
.. [lm_2] https://blog.abhranil.net/2015/05/15/lyapunov-exponent-of-the-logistic-map-mathematica-code/
Args:
x (float):
starting point
steps (int):
number of steps for which the generator should run
Kwargs:
r (int):
parameter r that controls the behavior of the map
Returns:
generator object:
the generator that creates the time series
"""
for _ in range(steps):
x = r * x * (1 - x)
yield x
[docs]
def load_financial():
"""
Loads the following datasets from CSV files in this package:
- jkse: Jakarta Composite Index, downloaded on 2019-02-12 from https://finance.yahoo.com/quote/%5EJKSE/history?period1=631148400&period2=988668000&interval=1d&filter=history&frequency=1d
- n225: Nikkei 225, downloaded on 2019-02-12 from https://finance.yahoo.com/quote/%5EN225/history?period1=631148400&period2=988668000&interval=1d&filter=history&frequency=1d
- ndx: NASDAQ 100, downloaded on 2019-02-12 from https://finance.yahoo.com/quote/%5ENDX/history?period1=631148400&period2=988668000&interval=1d&filter=history&frequency=1d
All datasets are daily prices from the period from 1990-01-01 to 2001-05-01
missing values are NaN except for opening values which are treated as
follows:
- If the first opening value is missing, the first *existing* opening value
is used for the first day.
- All other missing opening values are filled by the close value of the last
day where data was available.
Returns:
list of tuple(1d-array, 2d-array):
datasets with days as array of date objects and 2d-array with the columns
"Open", "High", "Low", "Close", "Adj Close", and "Volume". Note that
"Open" values have been padded to ensure that there are no NaNs left.
"""
def load_finance_yahoo_data(f):
f.readline()
days = []
values = []
for l in f:
fields = l.decode("utf-8")
fields = fields.split(",")
d = datetime.datetime.strptime(fields[0], "%Y-%m-%d")
v = [np.nan if x.strip() == "null" else float(x) for x in fields[1:]]
days.append(d)
values.append(v)
return np.array(days), np.array(values)
def pad_opening_values(values):
# fill first value from future if required
first = 0
while np.isnan(values[first, 0]):
first += 1
values[0, 0] = values[first, 0]
# iterate over all indices where data is missing
for i in np.where(np.isnan(values[:, 0]))[0]:
j = i
# pad opening value with close value of previous data
while np.isnan(values[j][3]):
j -= 1
values[i, 0] = values[j, 3]
data = []
for index in ["^JKSE", "^N225", "^NDX"]:
fname = "datasets/{}.csv".format(index)
with pkg_resources.resource_stream(__name__, fname) as f:
days, values = load_finance_yahoo_data(f)
pad_opening_values(values)
data.append((days, values))
return data
[docs]
def barabasi1991_fractal(size, iterations, b1=0.8, b2=0.5):
"""
Generates the simple fractal described in [bf]_.
The fractal divides a rectangular segment starting at (x0, y0) with width w
and height h along the x axis into four line segments of equal size with the
boundary points [x0, x1, x2, x3, x4]. It has two parameters b1 and b2 that
allow to choose the value for y(x1) and y(x3) while it always holds that
y(x0) = y0, y(x2) = y0 and y(x4) = y0 + h.
The process starts with a single line segment of height 1 spanning the whole
data range. In each iteration, the rectangles spanning the line segments
from the previous iteration are subdivided according to the same rule.
References:
.. [bf] A.-L. Barabási and T. Vicsek, “Multifractality of self-affine
fractals,” Physical Review A, vol. 44, no. 4, pp. 2730–2733, 1991.
Args:
size (int):
number of data points in the resulting array
iterations (int):
number of iterations to perform
Kwargs:
b1 (float):
relative height at x1 (between 0 and 1)
b2 (float):
relative height at x3 (between 0 and 1)
Returns:
(1d-array of float):
generated fractal
"""
def b1991(x0, y0, w, h):
if h < 0:
# for a segment with negative slope we have flip the x-axis
d, nxtp = b1991(x0, y0 + h, w, -h)
return d[::-1], nxtp
x1 = x0 + w // 4
x2 = x0 + w // 2
x3 = x2 + w // 4
x4 = x0 + w
data = np.zeros(w, dtype=np.float64)
data[x0 - x0:x1 - x0] = np.linspace(0, 1, x1 - x0) * b1 * h + y0
data[x1 - x0:x2 - x0] = np.linspace(1, 0, x2 - x1) * b1 * h + y0
data[x2 - x0:x3 - x0] = np.linspace(0, 1, x3 - x2) * b2 * h + y0
data[x3 - x0:x4 - x0] = np.linspace(0, 1, x4 - x3) * (1 - b2) * h \
+ y0 + b2 * h
return data, [x0, x1, x2, x3, x4]
fractal = np.linspace(0, 1, size)
intervals = [(0, size)]
for _ in range(iterations):
next_intervals = []
for x1, x2 in intervals:
d, nxtp = b1991(x1, fractal[x1], x2 - x1, fractal[x2-1] - fractal[x1])
fractal[x1:x2] = d
next_intervals.extend(
[(np1, np2) for np1, np2 in zip(nxtp[:-1], nxtp[1:])]
)
intervals = next_intervals
return fractal
brown72 = load_brown72()
jkse, n225, ndx = load_financial()