MAP Functions

MAP Functions

MAP functions are the core of the transforming data.

TAKE()

map_take

Syntax: TAKE( [offset,] n )

Takes first n records and stop the stream.

  • offset number optional, take records from the offset. (default 0 when omitted) Since v8.0.6
  • n number specify how may records to be taken.

DROP()

map_drop

Syntax: DROP( [offset,] n )

Ignore first n records, it simply drops the n records.

  • offset number optional, drop records from the offset. (default 0 when omitted) Since v8.0.6
  • n number specify how may records to be dropped.

MAPKEY()

map_mapkey

Syntax: MAPKEY( newkey )

Replace current key value with the given newkey.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
FAKE( json({
    [ "TAG0", 1628694000000000000, 10],
    [ "TAG0", 1628780400000000000, 11],
    [ "TAG0", 1628866800000000000, 12],
    [ "TAG0", 1628953200000000000, 13],
    [ "TAG0", 1629039600000000000, 14],
    [ "TAG0", 1629126000000000000, 15]
}))
MAPKEY(time("now"))
PUSHKEY("do-not-see")
CSV()
1701343504143299000,TAG0,1628694000000000000,10
1701343504143303000,TAG0,1628780400000000000,11
1701343504143308000,TAG0,1628866800000000000,12
1701343504143365000,TAG0,1628953200000000000,13
1701343504143379000,TAG0,1629039600000000000,14
1701343504143383000,TAG0,1629126000000000000,15

PUSHKEY()

map_pushkey

Syntax: PUSHKEY( newkey )

Apply new key on each record. The orignal key is push into value tuple.

For example, if an original record was {key: 'k1', value: [v1, v2]} and applied PUSHKEY(newkey), it produces the updated record as {key: newkey, values: [k1, v1, v1]}.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
FAKE( json({
    [ "TAG0", 1628694000000000000, 10],
    [ "TAG0", 1628780400000000000, 11],
    [ "TAG0", 1628866800000000000, 12],
    [ "TAG0", 1628953200000000000, 13],
    [ "TAG0", 1629039600000000000, 14],
    [ "TAG0", 1629126000000000000, 15]
}))
MAPKEY(time("now"))
PUSHKEY("do-not-see")
CSV()
1701343504143299000,TAG0,1628694000000000000,10
1701343504143303000,TAG0,1628780400000000000,11
1701343504143308000,TAG0,1628866800000000000,12
1701343504143365000,TAG0,1628953200000000000,13
1701343504143379000,TAG0,1629039600000000000,14
1701343504143383000,TAG0,1629126000000000000,15

POPKEY()

map_popkey

Syntax: POPKEY( [idx] )

Drop current key of the record, then promote idxth element of tuple as a new key.

For example, if an original record was {key: k, value: [v1, v2, v3]} and applied POPKEY(1), it produces the updated record as {key: v2, value:[v1, v3]}.

if use POPKEY() without argument it is equivalent with POPKEY(0) which is promoting the first element of the value tuple as the key.

GROUPBYKEY()

map_popkey

Syntax: GROUPBYKEY( [lazy(boolean)] )

  • lazy(boolean) If it set false which is default, GROUPBYKEY() yields new grouped record when the key of incoming record has changed from previous record. If it set true, GROUPBYKEY() waits the end of the input stream before yield any record.

GROUPBYKEY is equivalent expression with GROUP( by( key() ) ).

GROUP()

Syntax: GROUP( [lazy(boolean),] by [, aggregators...] ) Since v8.0.7

  • lazy(boolean) If it set false which is default, GROUP() yields new aggregated record when the value of by() has changed from previous record. If it set true, GROUP() waits the end of the input stream before yield any record.

  • by(value [, name]) The value how to group the values.

  • aggregators array of aggregator Aggregate functions

If no aggregator is specified GROUP make new array of the raw records for a each group by default. Takes multiple continuous records that have same key, then produces a new record which have value array contains all individual values. For example, if an original records was {key:k, value:[v1, v2]}, {key:k, value:{v3, v4}}{key:k, value:{vx, vy}}, GROUP( by(value()) ) produces the new record as {key:k, value:[[v1,v2],[v3,v4],...,[vx,vy]]}.

The GROUP() works comparing the value of by() of the current record to previous one, if it found the value of by has been changed, then produces new record. As result it can makes a group only if the continuous records have same key.

If aggregatos exists, GROUP yields the result value of the aggregate function as below.

aggregatordescription
avg(value [, name])average
sum(value [, name])total sum
first(value [, name])the first value of the elements
last(value [, name])the last value of the elements
min(value [, name])the min. value of the elements
max(value [, name])the max. value of the elements
rss(value [, name])root sum square
rms(value [, name])root mean square

The fields below differ from the other functions above in that they hold all the values of the corresponding key in memory buffer and generate the value when the key changed and yield record.

aggregatordescription
mean(value [, name])mean
median(value [, name])median (lower value)
meidanInterpolated(value [,name])median (lower interpolated value)
stddev(value [, name])standard deviation
stderr(value [, name])standard error
entropy(value [, name])Shannon entropy of a distribution. The natural logarithm is used.
mode(value [, name])The most common value in the dataset. Strict float64 equality is used when comparing values, so users should take caution. If several values are the mode, any of them may be returned.

FLATTEN()

map_flatten

Syntax: FLATTEN()

It works the oposite way of GROUPBYKEY(). Take a record whose value is multi-dimension tuple, produces multiple records for each elements of the tuple reducing the dimension.

For example, if an original record was {key:k, value:[[v1,v2],[v3,v4],...,[vx,vy]]}, it produces the new multiple records as {key:k, value:[v1, v2]}, {key:k, value:{v3, v4}}{key:k, value:{vx, vy}}.

PUSHVALUE()

Syntax: PUSHVALUE( idx, value [, name] ) Since v8.0.5

  • idx int Index where newValue insert at. (0 based)
  • value expression New value
  • name string column’s name (default ‘column’)

Insert the given value into the current values.

POPVALUE()

Syntax: PUSHVALUE( idx [, idx2, idx3, ...] ) Since v8.0.5

  • idx int array of indexes that will removed from values

It removes elements that specified by idxes from value array.

MAPVALUE()

map_mapvalue

Syntax: MAPVALUE( idx, newValue [, newName] )

  • idx int Index of the value tuple. (0 based)
  • newValue expression New value
  • newName string change column’s name with given string

Replace the value of the element at the given index. For examaple, MAPVALUE(0, value(0)*10) replaces a new value that is 10 times of the first element of value tuple.

If the idx is out of range, it works as PUSHVALUE() does. MAPVALUE(-1, value(1)+'_suffix') inserts a new string value that concatenates ‘_suffix’ with the 2nd element of value.

An example usage of math functions with MAPVALUE.

FILTER()

map_filter

Syntax: FILTER( condition )

Apply the condition statement on the incoming record, then it pass the record only if the condition is true.

For example, if an original record was {key: k1, value[v1, v2]} and apply FILTER(count(V) > 2), it simply drop the record. If the codition was FILTER(count(V) >= 2), it pass the record to the next function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
FAKE( json({
    [ "TAG0", 1628694000000000000, 10],
    [ "TAG0", 1628780400000000000, 11],
    [ "TAG0", 1628866800000000000, 12],
    [ "TAG0", 1628953200000000000, 13],
    [ "TAG0", 1629039600000000000, 14],
    [ "TAG0", 1629126000000000000, 15]
}))
FILTER( value(2) < 12 )
CSV()
TAG0,1628694000000000000,10
TAG0,1628780400000000000,11

TIMEWINDOW()

Synatax: TIMEWINDOW( fromTime, untilTime, period, [nullValue], fields...) Since v8.0.5

Aggregate raw values between fromTime and untilTime into a periodic duration and fill zero value if any value exists for the period.

  • fromTime time from (inclusive)
  • untilTime time until (exclusive)
  • period duration ex: period('1s')
  • nullValue if a certain period has no actual values it yields the given alternativeValue.(default is NULL) ex: nullValue(alternativeValue)
  • fields string specifies each field’s aggration function and indicates which column is the time. It should be one of pre-defines keywords.
fielddescription
timeindicator for timestamp column
avgaverage
sumtotal sum
first, lastthe first / last of elements
min, maxmin / max
rssroot sum square
rmsroot mean square

The fields below differ from the other functions above in that they hold all the values of the corresponding period in memory buffer and generate the value when the time window changed.

fielddescription
meanmean
medianmedian (lower value)
meidan-interpolatedmedian (lower interpolated value)
stddevstandard deviation
stderrstandard error
entropyShannon entropy of a distribution. The natural logarithm is used.
modeSince v8.0.7 The most common value in the dataset. Strict float64 equality is used when comparing values, so users should take caution. If several values are the mode, any of them may be returned.

FFT()

map_fft

Syntax: FFT()

It assumes value of the incoming record is an array of time,amplitude tuples, then applies Fast Fourier Transform on the array and replaces the value with an array of frequency,amplitude tuples. The key remains same.

For example, if the incoming record was {key: k, value[ [t1,a1],[t2,a2],...[tn,an] ]}, it transforms the value to {key:k, value[ [F1,A1], [F2,A2],...[Fm,Am] ]}.

WHEN()

Syntax: WHEN(condition, doer) Since v8.0.7

  • condition boolean
  • doer doer

WHEN runs doer action if the given condition is true. This function does not affects the flow of records, it just executes the defined side effect work.

doLog()

Syntax: doLog(args...) Since v8.0.7

Prints out log message on the web console.

1
2
3
FAKE( linspace(1, 2, 2))
WHEN( mod(value(0), 2) == 0, doLog(value(0), "is even."))
CSV()

doHttp()

Syntax: doHttp(method, url, body [, header...]) Since v8.0.7

  • method string
  • url string
  • body string
  • header string optional

doHttp requests the http endpoints with given method, url, body and headers.

Use cases

  • Notify an event to the specific HTTP endpoint.
1
2
3
4
5
6
FAKE( linspace(1, 4, 4))
WHEN(
    mod(value(0), 2) == 0,
    doHttp("GET", strSprintf("http://127.0.0.1:8888/notify?value=%.0f", value(0)), nil)
)
CSV()
  • Post the current record to the specific HTTP endpoint in CSV which is default format of doHttp.
1
2
3
4
5
6
FAKE( linspace(1, 4, 4))
WHEN(
    mod(value(0), 2) == 0,
    doHttp("POST", "http://127.0.0.1:8888/notify", value())
)
CSV()
  • Post the current record in a custom JSON format to the specific HTTP endpoint.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
FAKE( linspace(1, 4, 4))
WHEN(
    mod(value(0), 2) == 0,
    doHttp("POST", "http://127.0.0.1:8888/notify", 
        strSprintf(`{"message": "even", "value":%f}`, value(0)),
        "Content-Type: application/json",
        "X-Custom-Header: notification"
    )
)
CSV()

do()

Syntax: do(args..., { sub-flow-code }) Since v8.0.7

do executes the given sub flow code with passing args... arguments.

It is important to keep in mind that WHEN() is only for executing a side effect job on a certain condition. WHEN-do sub flow cannot affects to the main flow, which means it cannot use SINKs that produce result on output stream like CSV, JSON, and CHART_*. The output of a sub flow will be ignored silently, any writing attempts from a sink are ignored and showing warning messages.

Effective SINKs in a sub flow may be INSERT and APPEND which is not related with output stream, so that it can write the specific values on a different table from main TQL flow. Otherwise use DISCARD() sink, it silently discards any records in the sub flow without warning messages.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
FAKE( json({
    [ 1, "hello" ],
    [ 2, "你好" ],
    [ 3, "world" ],
    [ 4, "世界" ]
}))
WHEN(
    mod(value(0), 2) == 0,
    do( "Greetings:", value(0), value(1), {
        ARGS()
        WHEN( true, doLog( value(0), value(2) ) )
        DISCARD()
    })
)
CSV()

The log messages of the above code shows the two important points.

  1. The main flow is bloked and waits until its sub flow finishes the job.
  2. The sub flow is executed every time for a record that matches the condition.
2023-12-02 07:54:42.160 TRACE 0xc000bfa580 Task compiled FAKE() → WHEN() → CSV()
2023-12-02 07:54:42.160 TRACE 0xc000bfa840 Task compiled ARGS() → WHEN() → DISCARD()
2023-12-02 07:54:42.160 INFO  0xc000bfa840 Greetings: 你好
2023-12-02 07:54:42.160 DEBUG 0xc000bfa840 Task elapsed 254.583µs
2023-12-02 07:54:42.161 TRACE 0xc000bfa9a0 Task compiled ARGS() → WHEN() → DISCARD()
2023-12-02 07:54:42.161 INFO  0xc000bfa9a0 Greetings: 世界
2023-12-02 07:54:42.161 DEBUG 0xc000bfa9a0 Task elapsed 190.552µs
2023-12-02 07:54:42.161 DEBUG 0xc000bfa580 Task elapsed 1.102681ms

Use cases

When sub flow retrieves data from other than its arguments, it can access the arguments with args([idx]) option function.

  • Execute query with sub flow’s arguments.
// pseudo code
// ...
WHEN( condition,
    do(value(0), {
        SQL(`select time, value from table where name = ?`, args(0))
        // ... some map functions...
        INSERT(...)
    })
)
// ...
  • Retrieve csv file from external web server
// pseudo code
// ...
WHEN( condition,
    do(value(0), value(1), {
        CSV( file( strSprintf("https://exmaple.com/data_%s.csv?id=%s", args(0), escapeParam(args(1)) )))
        WHEN(true, doHttp("POST", "http://my_server", value()))
        DISCARD()
    })
)
// ...

SCRIPT()

Syntax: SCRIPT({ ... script code... })

Supporting user defined script language.

See SCRIPT section for the details with examples.

tengo

tengo is a Golang like script. The additional package “context” package is available that is exposing the TQL specific functionalities based the default packages from tengo.

TQL can execute user defined script within SCRIPT() by passing codes inside {, }.

ℹ️
IMPORTANT
If the script doesn’t call context.yield() and context.drop() explicitly for a record, it passes the record to the next function without any changes.

map_script

A context in the code of SCRIPT() provides serveral methods.

methoddescription
ctx.key()returns the key of the current record
ctx.value(idx)returns the value of idxth value of the current record
or returns whole records in an array if idx is omitted.
ctx.drop()discards the current record
ctx.yield(values...)produces a new record from the given values… arguments
ctx.yieldKey(key, values...)produces a new record from the given key and values…
ctx.param(name [, default])returns the value of the request parameter for the given name.
If the param specified by name string does not exists,it returns default string.
If default is not specified, it returns empty string""
ctx.bridge(name)returns bridge instance of the given name
ctx.println(args...)print log message to the web console

context package

context.yield()

Pass the incoming arguments to the next function.

context.key()

Returns the key of the current record.

context.value()

Returns the whole value of the current records in array. If the index is given, it returns the element of the values.

For example, If the current record is [0, true, "hello", "world"]

  • context.value() returns the whole values of the record [0, true, "hello", "world"]
  • context.value(0) returns the first element of the record 0
  • context.value(3) returns the last element of the record "world"

times package

times.time_format()

Convert timestamp to a string.

times.parse()

Convert a string to timestamp.

json package

json.decode()

Parse JSON string.

If the script ends with APPEND(...) or INSERT(...) instead of CSV() the final result records will be written into database.

Example

Open a new tql editor on the web ui and copy the code below and run it.

In this example, linspace(-4,4,100) generates an array contains 100 elements which are ranged from -4.0 to 4.0 in every 8/100 step. meshgrid() takes two array and produce meshed new array. As result of FAKE() in the example produces an array of 10000 elements (100 x 100 meshed) contains array of two float point numbers. SCRIPT() function takes a code block which enclosed by { and } and run it for each record. It takes the current record via context.value() then yield transformed data via context.yield().

The SCRIPT code above is actually equivalent with the TQL MAPVALUE(2, ...) function below. The math functions used in MAPVALUE became available Since v8.0.6 .

Last updated on