MAP Functions
MAP functions are the core of the transforming data.
TAKE()
Syntax: TAKE( [offset,] n )
Takes first n records and stop the stream.
offset
number optional, take records from the offset. (default 0 when omitted) Since v8.0.6n
number specify how may records to be taken.
DROP()
Syntax: DROP( [offset,] n )
Ignore first n records, it simply drops the n records.
offset
number optional, drop records from the offset. (default 0 when omitted) Since v8.0.6n
number specify how may records to be dropped.
MAPKEY()
Syntax: MAPKEY( newkey )
Replace current key value with the given newkey.
|
|
1701343504143299000,TAG0,1628694000000000000,10
1701343504143303000,TAG0,1628780400000000000,11
1701343504143308000,TAG0,1628866800000000000,12
1701343504143365000,TAG0,1628953200000000000,13
1701343504143379000,TAG0,1629039600000000000,14
1701343504143383000,TAG0,1629126000000000000,15
PUSHKEY()
Syntax: PUSHKEY( newkey )
Apply new key on each record. The orignal key is push into value tuple.
For example, if an original record was {key: 'k1', value: [v1, v2]}
and applied PUSHKEY(newkey)
, it produces the updated record as {key: newkey, values: [k1, v1, v1]}
.
|
|
1701343504143299000,TAG0,1628694000000000000,10
1701343504143303000,TAG0,1628780400000000000,11
1701343504143308000,TAG0,1628866800000000000,12
1701343504143365000,TAG0,1628953200000000000,13
1701343504143379000,TAG0,1629039600000000000,14
1701343504143383000,TAG0,1629126000000000000,15
POPKEY()
Syntax: POPKEY( [idx] )
Drop current key of the record, then promote idxth element of tuple as a new key.
For example, if an original record was {key: k, value: [v1, v2, v3]}
and applied POPKEY(1)
, it produces the updated record as {key: v2, value:[v1, v3]}
.
if use POPKEY()
without argument it is equivalent with POPKEY(0)
which is promoting the first element of the value tuple as the key.
GROUPBYKEY()
Syntax: GROUPBYKEY( [lazy(boolean)] )
lazy(boolean)
If it setfalse
which is default, GROUPBYKEY() yields new grouped record when the key of incoming record has changed from previous record. If it settrue
, GROUPBYKEY() waits the end of the input stream before yield any record.
GROUPBYKEY
is equivalent expression with GROUP( by( key() ) )
.
GROUP()
Syntax: GROUP( [lazy(boolean),] by [, aggregators...] )
Since v8.0.7
lazy(boolean)
If it setfalse
which is default, GROUP() yields new aggregated record when the value ofby()
has changed from previous record. If it settrue
, GROUP() waits the end of the input stream before yield any record.by(value [, name])
The value how to group the values.aggregators
array of aggregator Aggregate functions
If no aggregator is specified GROUP
make new array of the raw records for a each group by default.
Takes multiple continuous records that have same key, then produces a new record which have value array contains all individual values. For example, if an original records was {key:k, value:[v1, v2]}
, {key:k, value:{v3, v4}}
…{key:k, value:{vx, vy}}
, GROUP( by(value()) )
produces the new record as {key:k, value:[[v1,v2],[v3,v4],...,[vx,vy]]}
.
The GROUP() works comparing the value of by()
of the current record to previous one, if it found the value of by
has been changed, then produces new record. As result it can makes a group only if the continuous records have same key.
If aggregatos
exists, GROUP
yields the result value of the aggregate function as below.
aggregator | description |
---|---|
avg(value [, name]) | average |
sum(value [, name]) | total sum |
first(value [, name]) | the first value of the elements |
last(value [, name]) | the last value of the elements |
min(value [, name]) | the min. value of the elements |
max(value [, name]) | the max. value of the elements |
rss(value [, name]) | root sum square |
rms(value [, name]) | root mean square |
The fields below differ from the other functions above in that they hold all the values of the corresponding key in memory buffer and generate the value when the key changed and yield record.
aggregator | description |
---|---|
mean(value [, name]) | mean |
median(value [, name]) | median (lower value) |
meidanInterpolated(value [,name]) | median (lower interpolated value) |
stddev(value [, name]) | standard deviation |
stderr(value [, name]) | standard error |
entropy(value [, name]) | Shannon entropy of a distribution. The natural logarithm is used. |
mode(value [, name]) | The most common value in the dataset. Strict float64 equality is used when comparing values, so users should take caution. If several values are the mode, any of them may be returned. |
FLATTEN()
Syntax: FLATTEN()
It works the oposite way of GROUPBYKEY(). Take a record whose value is multi-dimension tuple, produces multiple records for each elements of the tuple reducing the dimension.
For example, if an original record was {key:k, value:[[v1,v2],[v3,v4],...,[vx,vy]]}
, it produces the new multiple records as {key:k, value:[v1, v2]}
, {key:k, value:{v3, v4}}
…{key:k, value:{vx, vy}}
.
PUSHVALUE()
Syntax: PUSHVALUE( idx, value [, name] )
Since v8.0.5
idx
int Index where newValue insert at. (0 based)value
expression New valuename
string column’s name (default ‘column’)
Insert the given value into the current values.
POPVALUE()
Syntax: PUSHVALUE( idx [, idx2, idx3, ...] )
Since v8.0.5
idx
int array of indexes that will removed from values
It removes elements that specified by idx
es from value array.
MAPVALUE()
Syntax: MAPVALUE( idx, newValue [, newName] )
idx
int Index of the value tuple. (0 based)newValue
expression New valuenewName
string change column’s name with given string
Replace the value of the element at the given index. For examaple, MAPVALUE(0, value(0)*10)
replaces a new value that is 10 times of the first element of value tuple.
If the idx
is out of range, it works as PUSHVALUE()
does. MAPVALUE(-1, value(1)+'_suffix')
inserts a new string value that concatenates ‘_suffix’ with the 2nd element of value.
An example usage of math functions with MAPVALUE
.
FILTER()
Syntax: FILTER( condition )
Apply the condition statement on the incoming record, then it pass the record only if the condition is true.
For example, if an original record was {key: k1, value[v1, v2]}
and apply FILTER(count(V) > 2)
, it simply drop the record. If the codition was FILTER(count(V) >= 2)
, it pass the record to the next function.
|
|
TAG0,1628694000000000000,10
TAG0,1628780400000000000,11
TIMEWINDOW()
Synatax: TIMEWINDOW( fromTime, untilTime, period, [nullValue], fields...)
Since v8.0.5
Aggregate raw values between fromTime and untilTime into a periodic duration and fill zero value if any value exists for the period.
fromTime
time from (inclusive)untilTime
time until (exclusive)period
duration ex:period('1s')
nullValue
if a certain period has no actual values it yields the given alternativeValue.(default is NULL) ex:nullValue(alternativeValue)
fields
string specifies each field’s aggration function and indicates which column is the time. It should be one of pre-defines keywords.
field | description |
---|---|
time | indicator for timestamp column |
avg | average |
sum | total sum |
first , last | the first / last of elements |
min , max | min / max |
rss | root sum square |
rms | root mean square |
The fields below differ from the other functions above in that they hold all the values of the corresponding period in memory buffer and generate the value when the time window changed.
field | description |
---|---|
mean | mean |
median | median (lower value) |
meidan-interpolated | median (lower interpolated value) |
stddev | standard deviation |
stderr | standard error |
entropy | Shannon entropy of a distribution. The natural logarithm is used. |
mode | Since v8.0.7 The most common value in the dataset. Strict float64 equality is used when comparing values, so users should take caution. If several values are the mode, any of them may be returned. |
FFT()
Syntax: FFT()
It assumes value of the incoming record is an array of time,amplitude tuples, then applies Fast Fourier Transform on the array and replaces the value with an array of frequency,amplitude tuples. The key remains same.
For example, if the incoming record was {key: k, value[ [t1,a1],[t2,a2],...[tn,an] ]}
, it transforms the value to {key:k, value[ [F1,A1], [F2,A2],...[Fm,Am] ]}
.
WHEN()
Syntax: WHEN(condition, doer)
Since v8.0.7
condition
booleandoer
doer
WHEN
runs doer
action if the given condition is true
.
This function does not affects the flow of records, it just executes the defined side effect work.
doLog()
Syntax: doLog(args...)
Since v8.0.7
Prints out log message on the web console.
|
|
doHttp()
Syntax: doHttp(method, url, body [, header...])
Since v8.0.7
method
stringurl
stringbody
stringheader
string optional
doHttp
requests the http endpoints with given method, url, body and headers.
Use cases
- Notify an event to the specific HTTP endpoint.
|
|
- Post the current record to the specific HTTP endpoint in CSV which is default format of
doHttp
.
|
|
- Post the current record in a custom JSON format to the specific HTTP endpoint.
|
|
do()
Syntax: do(args..., { sub-flow-code })
Since v8.0.7
do
executes the given sub flow code with passing args...
arguments.
It is important to keep in mind that WHEN()
is only for executing a side effect job on a certain condition.
WHEN-do
sub flow cannot affects to the main flow, which means it cannot use SINKs that produce result on output stream like CSV
, JSON
, and CHART_*
. The output of a sub flow will be ignored silently, any writing attempts from a sink are ignored and showing warning messages.
Effective SINKs in a sub flow may be INSERT
and APPEND
which is not related with output stream, so that it can write the specific values on a different table from main TQL flow. Otherwise use DISCARD()
sink, it silently discards any records in the sub flow without warning messages.
|
|
The log messages of the above code shows the two important points.
- The main flow is bloked and waits until its sub flow finishes the job.
- The sub flow is executed every time for a record that matches the condition.
2023-12-02 07:54:42.160 TRACE 0xc000bfa580 Task compiled FAKE() → WHEN() → CSV()
2023-12-02 07:54:42.160 TRACE 0xc000bfa840 Task compiled ARGS() → WHEN() → DISCARD()
2023-12-02 07:54:42.160 INFO 0xc000bfa840 Greetings: 你好
2023-12-02 07:54:42.160 DEBUG 0xc000bfa840 Task elapsed 254.583µs
2023-12-02 07:54:42.161 TRACE 0xc000bfa9a0 Task compiled ARGS() → WHEN() → DISCARD()
2023-12-02 07:54:42.161 INFO 0xc000bfa9a0 Greetings: 世界
2023-12-02 07:54:42.161 DEBUG 0xc000bfa9a0 Task elapsed 190.552µs
2023-12-02 07:54:42.161 DEBUG 0xc000bfa580 Task elapsed 1.102681ms
Use cases
When sub flow retrieves data from other than its arguments, it can access the arguments with args([idx])
option function.
- Execute query with sub flow’s arguments.
// pseudo code
// ...
WHEN( condition,
do(value(0), {
SQL(`select time, value from table where name = ?`, args(0))
// ... some map functions...
INSERT(...)
})
)
// ...
- Retrieve csv file from external web server
// pseudo code
// ...
WHEN( condition,
do(value(0), value(1), {
CSV( file( strSprintf("https://exmaple.com/data_%s.csv?id=%s", args(0), escapeParam(args(1)) )))
WHEN(true, doHttp("POST", "http://my_server", value()))
DISCARD()
})
)
// ...
SCRIPT()
Syntax: SCRIPT({ ... script code... })
Supporting user defined script language.
See SCRIPT section for the details with examples.
tengo
tengo is a Golang like script. The additional package “context” package is available that is exposing the TQL specific functionalities based the default packages from tengo.
TQL can execute user defined script within SCRIPT()
by passing codes inside {
, }
.
If the script doesn’t call
context.yield()
and context.drop()
explicitly for a record,
it passes the record to the next function without any changes.A context in the code of SCRIPT()
provides serveral methods.
method | description |
---|---|
ctx.key() | returns the key of the current record |
ctx.value(idx) | returns the value of idx th value of the current recordor returns whole records in an array if idx is omitted. |
ctx.drop() | discards the current record |
ctx.yield(values...) | produces a new record from the given values… arguments |
ctx.yieldKey(key, values...) | produces a new record from the given key and values… |
ctx.param(name [, default]) | returns the value of the request parameter for the given name. If the param specified by name string does not exists,it returns default string.If default is not specified, it returns empty string"" |
ctx.bridge(name) | returns bridge instance of the given name |
ctx.println(args...) | print log message to the web console |
context package
context.yield()
Pass the incoming arguments to the next function.
context.key()
Returns the key of the current record.
context.value()
Returns the whole value of the current records in array. If the index is given, it returns the element of the values.
For example, If the current record is [0, true, "hello", "world"]
context.value()
returns the whole values of the record[0, true, "hello", "world"]
context.value(0)
returns the first element of the record0
context.value(3)
returns the last element of the record"world"
times package
times.time_format()
Convert timestamp to a string.
times.parse()
Convert a string to timestamp.
json package
json.decode()
Parse JSON string.
If the script ends with APPEND(...)
or INSERT(...)
instead of CSV()
the final result records will be written into database.
Example
Open a new tql editor on the web ui and copy the code below and run it.
In this example, linspace(-4,4,100)
generates an array contains 100 elements which are ranged from -4.0 to 4.0 in every 8/100
step. meshgrid()
takes two array and produce meshed new array. As result of FAKE() in the example produces an array of 10000 elements (100 x 100 meshed) contains array of two float point numbers.
SCRIPT()
function takes a code block which enclosed by {
and }
and run it for each record.
It takes the current record via context.value()
then yield transformed data via context.yield()
.
The SCRIPT code above is actually equivalent with the TQL MAPVALUE(2, ...)
function below.
The math functions used in MAPVALUE became available
Since v8.0.6
.