PipelineDBでCV定義をする時に気をつけること(時間関数編)とContinuous Transform

  • このエントリーをはてなブックマークに追加

CV(Continuous View)を定義する時に、その瞬間の時間を表すものは、arrival_timestampclock_timestamp()が使えます。
now()等も時間を返しますが、この関数はCV定義に使う事はできません。

現在の日付と時刻(現在のトランザクションの開始時)

https://www.postgresql.jp/document/9.5/html/functions-datetime.html

とあります。通常のSQLではこれでも問題ないのですが、CVはStreamに対してずっとTransactionをかけている事になっているので、実質的にPipelineDBの起動時間やCVを定義した時間が返ってくる事になります。
なのでCVでは使う事ができません。

また、click_timestamp()は1つしか定義できません。複数定義すると以下のエラーが発生します。

CREATE CONTINUOUS VIEW test2 AS
SELECT
to_char(to_timestamp((item->>'eventTimestamp')::bigint + 3600*9), 'YYYY-MM-DD HH24:MI:00') as jst,
(item->>'platform')::text as platform,
(item->>'action')::text as action,
count(*) as cnt
FROM sn_action_stream
WHERE
minute(arrival_timestamp) > clock_timestamp() - interval '3 days'
AND to_timestamp((item->>'t')::bigint) > clock_timestamp() - interval '3 days'
GROUP BY
jst, platform, action;
ERROR: clock_timestamp() may only appear once in a WHERE clause

そもそも、なぜこのようにしているかと言うと、直近3日間に到着したログだけを対象にしたいが、ログは遅延して送られてくる事が多く(数日遅れて来る場合もある..)レコードの日付も3日以内である事をチェックしたい場合などです。
その場合、レコードの時間だけを対象にすれば良いのですが、それだとPipelineDBのレコードとしてのLifetimeを設定できないのです。
WITH (max_age)使えば良いのですが。

どうしても複数使いたい場合は、サブクエリで回避できます。

CREATE CONTINUOUS VIEW test2 AS
SELECT
to_char(to_timestamp((item->>'eventTimestamp')::bigint + 3600*9), 'YYYY-MM-DD HH24:MI:00') as jst,
(item->>'platform')::text as platform,
(item->>'action')::text as action,
count(*) as cnt
FROM (SELECT arrival_timestamp, item FROM sn_action_stream WHERE to_timestamp((item->>'eventTimestamp')::bigint) > clock_timestamp() - interval '3 days') _
WHERE
minute(arrival_timestamp) > clock_timestamp() - interval '3 days'
GROUP BY
jst, platform, action;

ただし、こんな事をしなくても Continuous Transform を使うともっと良い感じにできます。
Continuous Transformとは、StreamからCVを定義し、その結果を別のStreamへ流す事ができます。つまりフィルタとしての振る舞いをします。

CREATE STREAM strm_test1 (item JSONB);
CREATE STREAM strm_test2 (item JSONB);

CREATE CONTINUOUS TRANSFORM test_etl AS
SELECT item::jsonb
FROM strm_test1
WHERE
to_timestamp((item->>'eventTimestamp')::bigint) > clock_timestamp() - interval '3 days'
THEN EXECUTE PROCEDURE pipeline_stream_insert('strm_test2')
;

このように定義して、綺麗になったtest2を参照するCVを定義します。

CREATE CONTINUOUS VIEW test2 AS
SELECT
to_char(to_timestamp((item->>'eventTimestamp')::bigint + 3600*9), 'YYYY-MM-DD HH24:MI:00') as jst,
(item->>'platform')::text as platform,
(item->>'action')::text as action,
count(*) as cnt
FROM strm_test2
WHERE
minute(arrival_timestamp) > clock_timestamp() - interval '3 days'
GROUP BY
jst, platform, action;

pipeline_stream_insertはv0.9.3で用意された関数ですが、これを使わなくても独自で定義する事ができます。
話がだんだん逸れていきますが、Continuous Transformを使えばStreamのコピーをする事もできます。
それはまた別の記事で。