PostgreSQLでSQLの勉強

  • このエントリーをはてなブックマークに追加

色々な事を考えていたら、ハマったのでその記録。

Postgresのバージョンは9.5.3です。(正確にはPipelineDBの0.9.5です)
例えば、以下のテーブルがあります。

1
2
3
4
CREATE TABLE imp   (campaign_id BIGINT, campaign_creative_id BIGINT,   imp_count BIGINT, CONSTRAINT   pk_imp1 PRIMARY KEY (campaign_id, campaign_creative_id));
CREATE TABLE vimp (campaign_id BIGINT, campaign_creative_id BIGINT, vimp_count BIGINT, CONSTRAINT pk_vimp1 PRIMARY KEY (campaign_id, campaign_creative_id));
CREATE TABLE click (campaign_id BIGINT, campaign_creative_id BIGINT, click_count BIGINT, CONSTRAINT pk_click1 PRIMARY KEY (campaign_id, campaign_creative_id));
CREATE TABLE conv (campaign_id BIGINT, campaign_creative_id BIGINT, conv_count BIGINT, CONSTRAINT pk_conv1 PRIMARY KEY (campaign_id, campaign_creative_id));

これを次のようにまとめたいとします。なんとなくやりたい事は理解してもらえると思います。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT
COALESCE(imp.campaign_id, vimp.campaign_id, click.campaign_id, conv.campaign_id) as campaign_id,
COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id, click.campaign_creative_id, conv.campaign_creative_id) as campaign_creative_id,
COALESCE(imp.imp_count, 0) as imp_count,
COALESCE(vimp.vimp_count, 0) as vimp_count,
COALESCE(click.click_count, 0) as click_count,
COALESCE(conv.conv_count, 0) as conv_count
FROM
imp
FULL OUTER JOIN vimp
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN click
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN conv
USING (campaign_id, campaign_creative_id)
;

期待する結果としては次のようになります。

1
2
3
4
5
6
7
INSERT INTO imp VALUES(100, 1000, 111);
INSERT INTO imp VALUES(100, 1001, 221);
INSERT INTO imp VALUES(101, 1000, 333);
INSERT INTO vimp VALUES(100, 1000, 30);
INSERT INTO click VALUES(102, 1004, 5);
INSERT INTO conv VALUES(102, 1004, 1);
INSERT INTO conv VALUES(103, 1005, 2);
1
2
3
4
5
6
7
 campaign_id | campaign_creative_id | imp_count | vimp_count | click_count | conv_count
-------------+----------------------+-----------+------------+-------------+------------
100 | 1000 | 111 | 30 | 0 | 0
100 | 1001 | 221 | 0 | 0 | 0
101 | 1000 | 333 | 0 | 0 | 0
102 | 1004 | 0 | 0 | 5 | 1
103 | 1005 | 0 | 0 | 0 | 2

ただし欲しい情報は全部ではないので、campaign_idで絞り込みます。
最終的にはFUNCTIONにして可変長引数としてbigint[]を受け取り、それをcampaign_id = ANY(campaign_ids)みたいな事をしますが、問題を簡単にするためにひとまず1件のcampaign_idを指定します。

いくつかのパターンを試す

ダミーデータで実行してもあまり意味が無いので、手元にあるテーブルを使います。
テーブル名が上記のサンプルからがらりと変わりますが、適宜脳内変換をして頂ければ…

愚直にWHERE句に条件を書く

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SELECT
COALESCE(imp.campaign_id, vimp.campaign_id, click.campaign_id, conv.campaign_id) as campaign_id,
COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id, click.campaign_creative_id, conv.campaign_creative_id) as campaign_creative_id,
COALESCE(imp.imp_count, 0) as imp_count,
COALESCE(vimp.vimp_count, 0) as vimp_count,
COALESCE(click.click_count, 0) as click_count,
COALESCE(conv.conv_count, 0) as conv_count
FROM
imp
FULL OUTER JOIN vimp
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN click
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN conv
USING (campaign_id, campaign_creative_id)
WHERE
campaign_id = 102

imp.campaign_idではなくcampaign_id
WHERE句に書いたフィルタリングは結合の後に実行されます。

1
2
3
 campaign_id | campaign_creative_id | imp_count | vimp_count | click_count | conv_count
-------------+----------------------+-----------+------------+-------------+------------
102 | 1004 | 0 | 0 | 5 | 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Hash Full Join  (cost=257.71..361.29 rows=8 width=96) (actual time=0.051..0.056 rows=1 loops=1)
Hash Cond: ((COALESCE(COALESCE(imp.campaign_id, vimp.campaign_id), click.campaign_id) = conv.campaign_id) AND (COALESCE(COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id), click.campaign_creative_id) = conv.campaign_creative_id))
Filter: (COALESCE(COALESCE(COALESCE(imp.campaign_id, vimp.campaign_id), click.campaign_id), conv.campaign_id) = 102)
Rows Removed by Filter: 3
-> Merge Full Join (cost=243.22..335.01 rows=1570 width=72) (actual time=0.024..0.028 rows=4 loops=1)
Merge Cond: ((click.campaign_id = (COALESCE(imp.campaign_id, vimp.campaign_id))) AND (click.campaign_creative_id = (COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id))))
-> Index Scan using pk_click1 on click (cost=0.15..71.70 rows=1570 width=24) (actual time=0.003..0.003 rows=1 loops=1)
-> Sort (cost=243.06..246.99 rows=1570 width=48) (actual time=0.019..0.020 rows=3 loops=1)
Sort Key: (COALESCE(imp.campaign_id, vimp.campaign_id)), (COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id))
Sort Method: quicksort Memory: 25kB
-> Merge Full Join (cost=0.30..159.72 rows=1570 width=48) (actual time=0.006..0.010 rows=3 loops=1)
Merge Cond: ((imp.campaign_id = vimp.campaign_id) AND (imp.campaign_creative_id = vimp.campaign_creative_id))
-> Index Scan using pk_imp1 on imp (cost=0.15..71.70 rows=1570 width=24) (actual time=0.002..0.002 rows=3 loops=1)
-> Index Scan using pk_vimp1 on vimp (cost=0.15..71.70 rows=1570 width=24) (actual time=0.002..0.003 rows=1 loops=1)
-> Hash (cost=14.37..14.37 rows=8 width=24) (actual time=0.013..0.013 rows=1 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 9kB
-> Bitmap Heap Scan on conv (cost=4.21..14.37 rows=8 width=24) (actual time=0.011..0.011 rows=1 loops=1)
Recheck Cond: (campaign_id = 102)
Heap Blocks: exact=1
-> Bitmap Index Scan on pk_conv1 (cost=0.00..4.21 rows=8 width=0) (actual time=0.008..0.008 rows=1 loops=1)
Index Cond: (campaign_id = 102)
Planning time: 0.295 ms
Execution time: 0.119 ms

WHERE句に全部条件を書いてみる

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
SELECT
COALESCE(imp.campaign_id, vimp.campaign_id, click.campaign_id, conv.campaign_id) as campaign_id,
COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id, click.campaign_creative_id, conv.campaign_creative_id) as campaign_creative_id,
COALESCE(imp.imp_count, 0) as imp_count,
COALESCE(vimp.vimp_count, 0) as vimp_count,
COALESCE(click.click_count, 0) as click_count,
COALESCE(conv.conv_count, 0) as conv_count
FROM
imp
FULL OUTER JOIN vimp
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN click
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN conv
USING (campaign_id, campaign_creative_id)
WHERE
imp.campaign_id = 102
OR vimp.campaign_id = 102
OR click.campaign_id = 102
OR conv.campaign_id = 102
;

※今回のケースではANDで繋げるのはNG。とてもひどいことになった気がする。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Merge Full Join  (cost=418.50..510.92 rows=31 width=96) (actual time=0.034..0.035 rows=1 loops=1)
Merge Cond: ((conv.campaign_id = (COALESCE(COALESCE(imp.campaign_id, vimp.campaign_id), click.campaign_id))) AND (conv.campaign_creative_id = (COALESCE(COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id), click.campaign_creative_id))))
Filter: ((imp.campaign_id = 102) OR (vimp.campaign_id = 102) OR (click.campaign_id = 102) OR (conv.campaign_id = 102))
Rows Removed by Filter: 4
-> Index Scan using pk_conv1 on conv (cost=0.15..71.70 rows=1570 width=24) (actual time=0.006..0.006 rows=2 loops=1)
-> Sort (cost=418.35..422.28 rows=1570 width=72) (actual time=0.022..0.022 rows=4 loops=1)
Sort Key: (COALESCE(COALESCE(imp.campaign_id, vimp.campaign_id), click.campaign_id)), (COALESCE(COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id), click.campaign_creative_id))
Sort Method: quicksort Memory: 25kB
-> Merge Full Join (cost=243.22..335.01 rows=1570 width=72) (actual time=0.015..0.018 rows=4 loops=1)
Merge Cond: ((click.campaign_id = (COALESCE(imp.campaign_id, vimp.campaign_id))) AND (click.campaign_creative_id = (COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id))))
-> Index Scan using pk_click1 on click (cost=0.15..71.70 rows=1570 width=24) (actual time=0.001..0.001 rows=1 loops=1)
-> Sort (cost=243.06..246.99 rows=1570 width=48) (actual time=0.012..0.012 rows=3 loops=1)
Sort Key: (COALESCE(imp.campaign_id, vimp.campaign_id)), (COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id))
Sort Method: quicksort Memory: 25kB
-> Merge Full Join (cost=0.30..159.72 rows=1570 width=48) (actual time=0.005..0.008 rows=3 loops=1)
Merge Cond: ((imp.campaign_id = vimp.campaign_id) AND (imp.campaign_creative_id = vimp.campaign_creative_id))
-> Index Scan using pk_imp1 on imp (cost=0.15..71.70 rows=1570 width=24) (actual time=0.002..0.003 rows=3 loops=1)
-> Index Scan using pk_vimp1 on vimp (cost=0.15..71.70 rows=1570 width=24) (actual time=0.001..0.002 rows=1 loops=1)
Planning time: 0.194 ms
Execution time: 0.084 ms

JOINにONを使って、そこで絞り込む

https://www.postgresql.jp/document/9.5/html/queries-table-expressions.html

この理由はON句の中の制約は結合の前に処理され、一方WHERE句の中の制約は結合の後に処理されることによります。 これは内部結合には影響がありませんが、外部結合には大きな影響があります。

JOINは結合前にフィルタリングされる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT
COALESCE(imp.campaign_id, vimp.campaign_id, click.campaign_id, conv.campaign_id) as campaign_id,
COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id, click.campaign_creative_id, conv.campaign_creative_id) as campaign_creative_id,
COALESCE(imp.imp_count, 0) as imp_count,
COALESCE(vimp.vimp_count, 0) as vimp_count,
COALESCE(click.click_count, 0) as click_count,
COALESCE(conv.conv_count, 0) as conv_count
FROM
imp
FULL OUTER JOIN vimp
ON (imp.campaign_id = vimp.campaign_id AND imp.campaign_creative_id = vimp.campaign_creative_id)
FULL OUTER JOIN click
ON (imp.campaign_id = click.campaign_id AND imp.campaign_creative_id = click.campaign_creative_id)
FULL OUTER JOIN conv
ON (imp.campaign_id = conv.campaign_id AND imp.campaign_creative_id = conv.campaign_creative_id)
;

あれ..この時点で結果が違うぞ..

1
2
3
4
5
6
7
8
 campaign_id | campaign_creative_id | imp_count | vimp_count | click_count | conv_count
-------------+----------------------+-----------+------------+-------------+------------
100 | 1000 | 111 | 30 | 0 | 0
100 | 1001 | 221 | 0 | 0 | 0
101 | 1000 | 333 | 0 | 0 | 0
102 | 1004 | 0 | 0 | 5 | 0
102 | 1004 | 0 | 0 | 0 | 1
103 | 1005 | 0 | 0 | 0 | 2

こうでした。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT
COALESCE(imp.campaign_id, vimp.campaign_id, click.campaign_id, conv.campaign_id) as campaign_id,
COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id, click.campaign_creative_id, conv.campaign_creative_id) as campaign_creative_id,
COALESCE(imp.imp_count, 0) as imp_count,
COALESCE(vimp.vimp_count, 0) as vimp_count,
COALESCE(click.click_count, 0) as click_count,
COALESCE(conv.conv_count, 0) as conv_count
FROM
imp
FULL OUTER JOIN vimp
ON (imp.campaign_id = vimp.campaign_id AND imp.campaign_creative_id = vimp.campaign_creative_id)
FULL OUTER JOIN click
ON (vimp.campaign_id = click.campaign_id AND vimp.campaign_creative_id = click.campaign_creative_id)
FULL OUTER JOIN conv
ON (click.campaign_id = conv.campaign_id AND click.campaign_creative_id = conv.campaign_creative_id)
;
1
2
3
4
5
6
7
 campaign_id | campaign_creative_id | imp_count | vimp_count | click_count | conv_count
-------------+----------------------+-----------+------------+-------------+------------
102 | 1004 | 0 | 0 | 5 | 1
100 | 1000 | 111 | 30 | 0 | 0
100 | 1001 | 221 | 0 | 0 | 0
101 | 1000 | 333 | 0 | 0 | 0
103 | 1005 | 0 | 0 | 0 | 2

これを元にフィルタリング。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT
COALESCE(imp.campaign_id, vimp.campaign_id, click.campaign_id, conv.campaign_id) as campaign_id,
COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id, click.campaign_creative_id, conv.campaign_creative_id) as campaign_creative_id,
COALESCE(imp.imp_count, 0) as imp_count,
COALESCE(vimp.vimp_count, 0) as vimp_count,
COALESCE(click.click_count, 0) as click_count,
COALESCE(conv.conv_count, 0) as conv_count
FROM
imp
FULL OUTER JOIN vimp
ON (imp.campaign_id = 102 AND vimp.campaign_id = 102 AND imp.campaign_id = vimp.campaign_id AND imp.campaign_creative_id = vimp.campaign_creative_id)
FULL OUTER JOIN click
ON (vimp.campaign_id = 102 AND click.campaign_id = 102 AND vimp.campaign_id = click.campaign_id AND vimp.campaign_creative_id = click.campaign_creative_id)
FULL OUTER JOIN conv
ON (click.campaign_id = 102 AND conv.campaign_id = 102 AND click.campaign_id = conv.campaign_id AND click.campaign_creative_id = conv.campaign_creative_id)
;

期待する結果にならないな.. FULL OUTER JOINは難しい。
そもそも何をしたいかと言うと、WHERE句に書くとJOINした後のフィルタリングなので、JOINする前に必要な情報だけに削ってからJOINしたい。
という事は、campaign_id=102で絞った各テーブルを結合する事になる。

WITH構文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
WITH 
fimp AS (SELECT * FROM imp WHERE campaign_id = 102),
fvimp AS (SELECT * FROM vimp WHERE campaign_id = 102),
fclick AS (SELECT * FROM click WHERE campaign_id = 102),
fconv AS (SELECT * FROM conv WHERE campaign_id = 102)
SELECT
COALESCE(fimp.campaign_id, fvimp.campaign_id, fclick.campaign_id, fconv.campaign_id) as campaign_id,
COALESCE(fimp.campaign_creative_id, fvimp.campaign_creative_id, fclick.campaign_creative_id, fconv.campaign_creative_id) as campaign_creative_id,
COALESCE(fimp.imp_count, 0) as imp_count,
COALESCE(fvimp.vimp_count, 0) as vimp_count,
COALESCE(fclick.click_count, 0) as click_count,
COALESCE(fconv.conv_count, 0) as conv_count
FROM
fimp
FULL OUTER JOIN fvimp
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN fclick
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN fconv
USING (campaign_id, campaign_creative_id)
;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Hash Full Join  (cost=58.33..58.70 rows=8 width=96) (actual time=0.071..0.075 rows=1 loops=1)
Hash Cond: ((COALESCE(COALESCE(fimp.campaign_id, fvimp.campaign_id), fclick.campaign_id) = fconv.campaign_id) AND (COALESCE(COALESCE(fimp.campaign_creative_id, fvimp.campaign_creative_id), fclick.campaign_creative_id) = fconv.campaign_creative_id))
CTE fimp
-> Bitmap Heap Scan on imp (cost=4.21..14.37 rows=8 width=24) (actual time=0.004..0.004 rows=0 loops=1)
Recheck Cond: (campaign_id = 102)
-> Bitmap Index Scan on pk_imp1 (cost=0.00..4.21 rows=8 width=0) (actual time=0.003..0.003 rows=0 loops=1)
Index Cond: (campaign_id = 102)
CTE fvimp
-> Bitmap Heap Scan on vimp (cost=4.21..14.37 rows=8 width=24) (actual time=0.014..0.014 rows=0 loops=1)
Recheck Cond: (campaign_id = 102)
-> Bitmap Index Scan on pk_vimp1 (cost=0.00..4.21 rows=8 width=0) (actual time=0.014..0.014 rows=0 loops=1)
Index Cond: (campaign_id = 102)
CTE fclick
-> Bitmap Heap Scan on click (cost=4.21..14.37 rows=8 width=24) (actual time=0.003..0.003 rows=1 loops=1)
Recheck Cond: (campaign_id = 102)
Heap Blocks: exact=1
-> Bitmap Index Scan on pk_click1 (cost=0.00..4.21 rows=8 width=0) (actual time=0.002..0.002 rows=1 loops=1)
Index Cond: (campaign_id = 102)
CTE fconv
-> Bitmap Heap Scan on conv (cost=4.21..14.37 rows=8 width=24) (actual time=0.010..0.010 rows=1 loops=1)
Recheck Cond: (campaign_id = 102)
Heap Blocks: exact=1
-> Bitmap Index Scan on pk_conv1 (cost=0.00..4.21 rows=8 width=0) (actual time=0.008..0.008 rows=1 loops=1)
Index Cond: (campaign_id = 102)
-> Hash Full Join (cost=0.56..0.86 rows=8 width=72) (actual time=0.042..0.042 rows=1 loops=1)
Hash Cond: ((COALESCE(fimp.campaign_id, fvimp.campaign_id) = fclick.campaign_id) AND (COALESCE(fimp.campaign_creative_id, fvimp.campaign_creative_id) = fclick.campaign_creative_id))
-> Hash Full Join (cost=0.28..0.51 rows=8 width=48) (actual time=0.028..0.028 rows=0 loops=1)
Hash Cond: ((fimp.campaign_id = fvimp.campaign_id) AND (fimp.campaign_creative_id = fvimp.campaign_creative_id))
-> CTE Scan on fimp (cost=0.00..0.16 rows=8 width=24) (actual time=0.004..0.004 rows=0 loops=1)
-> Hash (cost=0.16..0.16 rows=8 width=24) (actual time=0.014..0.014 rows=0 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 8kB
-> CTE Scan on fvimp (cost=0.00..0.16 rows=8 width=24) (actual time=0.014..0.014 rows=0 loops=1)
-> Hash (cost=0.16..0.16 rows=8 width=24) (actual time=0.005..0.005 rows=1 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 9kB
-> CTE Scan on fclick (cost=0.00..0.16 rows=8 width=24) (actual time=0.003..0.003 rows=1 loops=1)
-> Hash (cost=0.16..0.16 rows=8 width=24) (actual time=0.017..0.017 rows=1 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 9kB
-> CTE Scan on fconv (cost=0.00..0.16 rows=8 width=24) (actual time=0.014..0.015 rows=1 loops=1)
Planning time: 0.513 ms
Execution time: 0.347 ms

あれ..随分遅くなった..
件数が少ないからかしら。

データを多く投入してみる

各テーブルに10万件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
WITH RECURSIVE tmp(c1,c2,c3) AS (
SELECT 1001, 10000,floor(random()*10000000)
UNION ALL
SELECT c1+1, c2+2,floor(random()*10000000)
FROM tmp WHERE c1 < 1000 + 100000
)
INSERT INTO imp SELECT * FROM tmp;

WITH RECURSIVE tmp(c1,c2,c3) AS (
SELECT 1001, 10000,floor(random()*10000000)
UNION ALL
SELECT c1+1, c2+2,floor(random()*10000000)
FROM tmp WHERE c1 < 1000 + 100000
)
INSERT INTO vimp SELECT * FROM tmp;

WITH RECURSIVE tmp(c1,c2,c3) AS (
SELECT 1001, 10000,floor(random()*10000000)
UNION ALL
SELECT c1+1, c2+2,floor(random()*10000000)
FROM tmp WHERE c1 < 1000 + 100000
)
INSERT INTO click SELECT * FROM tmp;

WITH RECURSIVE tmp(c1,c2,c3) AS (
SELECT 1001, 10000,floor(random()*10000000)
UNION ALL
SELECT c1+1, c2+2,floor(random()*10000000)
FROM tmp WHERE c1 < 1000 + 100000
)
INSERT INTO conv SELECT * FROM tmp;

なんだかんだでWITH構文(CTE)使う方法が一番良いっぽい。
最終的には、WITHで絞り込んだ情報をJOINすると、240msくらいから0.1msくらいまで高速にできました。

とはいえ、やりたいことはこれではなく、この続きでまた躓くことになるのです。
(PostgresではなくPipelineDBの不具合によって..)

参照

Kinesis Analyticsを実データで試して気になったことメモ

  • このエントリーをはてなブックマークに追加

サンプルは普通に動きますが、サンプルを動かしただけでは実際にハマりどころが分からないので、実際のデータを使ってみるよ。
調べながら書いた自分用のメモなので、非常に見にくいと思います。

https://aws.amazon.com/jp/blogs/aws/amazon-kinesis-analytics-process-streaming-data-in-real-time-with-sql/
Kinesis Analyticsというものが発表されまして、Kinesisに対してContinuous Queryを実行できるものです。
端的に言えばNorikraみたいなものです。

PipelineDBにも似ている気はしますが、PipelineDBの最大の特徴である結果を保持しておいてSQLで取り出すという事は出来ないので、個人的には競合にはならなそうですね。
お手軽感で言えばConsumerを既に持っているのであれば、PipelineDBの方に軍配が上がります。(PipelineDBはkinesis connectorも実はあります)
まだ日本リージョンでは使えないので、OregonにKinesis Streamを作り、一部のproductionのデータをそちらにもコピーするようにしました。

気になった事を列挙していきます。

  • Kinesisに流れるデータは、CSVかJSONである必要がある
  • Source Streamを選択すると、実データを元にスキーマを推定して定義を作成する
    • 流れているIdがたまたま小さいと、TINYINTやSMALLINTとして扱われてしまう
  • error_stream という物が存在する
    • 定義したスキーマにそぐわないものは、このstreamに流れる
    • 他にもContinuous QueryでのエラーやRecordのOutOfOrderで流れるらしい
  • スキーマで定義できる型は次の通り
  • スキーマの名前に、
    • hwなどの1文字はNG
      • 属性名が1文字の場合はエラーが出て作成できない.. 😨
      • [a-zA-Z][a-zA-Z0-9_]+ このパターンじゃないと駄目
    • SQL:2008の予約語やKinesisの予約語もNG
      • 列名にtimestmap, rank とか駄目
  • 日本語が文字化ける
    • Raw stream sample ってタブを見ると、きちんと日本語が表示されている
    • スキーマ設定には、Record Formatという項目があって、UTF-8と書かれている(変更はできない模様)
    • Amazon Kinesis Analytics supports only Java single-byte CHARACTER SETs.Documentに書いてある
    • つまりマルチバイト文字駄目って事?
  • Timestamp型として定義する場合、Sourceは文字列, UNIXTIME 等何がOKなのか?
    • UNIXTIMEはNG
    • 文字列だったらOKだった
    • TIMEZONEは常にUTC
  • 定義したスキーマとは別に次の項目が付与される
    • PARTITION_KEY VARCHAR(512)
    • SEQUENCE_NUMBER VARCHAR(128)
    • SHARD_ID VARCHAR(64)
    • APPROXIMATE_ARRIVAL_TIME TIMESTAMP
  • Source Streamへの書き込みを止めていると、Save schema and update samplesのボタンを押してもNo rows in source streamと言われる事がある。
    • 一度通ったスキーマでも、次更新しようとするとNo rows in source streamって言われる。
  • 謎の5.0.1というバージョンが
  • S3のデータをJOIN
    • サンプルにはCompanyNameっていうS3のデータとJOINしているっぽいものがある
    • Consoleから設定はできない。APIを使う必要がある。
      • Amazon Kinesis Analytics console does not support managing reference data sources for your applications. You can use the AWS CLI to add reference data source to your application.
    • S3のデータ、マルチバイト文字列たぶん駄目だと思うけど、まだ試していない。
      • 後述の方法で登録はしてみたものの、エラーが出る。
  • 1個のApplicationにReferenceは1つまで
    • つまり、複数のS3のファイルとJOINする事はできない
  • ポチポチ弄ってると、ApplicationのStateがずっとUpdatingのままになる
    • 20分ほど放置しても、ずっと変わらず。
    • 何も操作できない..😨
  • PUMPのSQLで列名を参照する時にエラーになる
    • Schema定義の時に列名を全て大文字で定義しないと参照できない
      • じゃぁValidationの正規表現、小文字を許すべきではないのでは..😨
    • ダブルクォートで括れば良い

ここまでで、自分のデータでクエリをかけるところまではできた。

が、IAM Roleに権限を追加しても、未だにS3のファイル参照は出来ていない。
ひとまず調査はここまでにして、気になった事をサポートに投げています。

S3のデータのデータを取り込む

awscliを最新の状態にする(試した時は 1.10.56)

s3://bucket*/tamtam-test/kinesis_analytics/devices.csv

1
2
3
4
5
6
deviceType,name
IPHONE,あいぽん
IPOD,あいぽっど
IPAD,あいぱっど
ANDROID,あんどろいど
ANDROID_TABLET,あんどろいどタブレット
1
2
3
4
5
6
7
# version-idを見る
version_id=$(aws kinesisanalytics describe-application --application-name tamtam-test1 | jq .ApplicationDetail.ApplicationVersionId)

aws kinesisanalytics add-application-reference-data-source \
--application-name tamtam-test1 \
--current-application-version-id ${version_id} \
--reference-data-source "$(cat devices-input.json)"

devices-input.jsonの中身です。一部マスクしてます。
エラーが出て良くわからない場合は、--debugを付けて実行すると良いです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
"TableName": "Devices",
"S3ReferenceDataSource": {
"BucketARN": "arn:aws:s3:::***bucket**",
"FileKey": "tamtam-test/kinesis_analytics/devices.csv",
"ReferenceRoleARN": "arn:aws:iam::**account id**:role/service-role/kinesis-analytics-tamtam-test1"
},
"ReferenceSchema": {
"RecordFormat": {
"RecordFormatType": "CSV",
"MappingParameters": {
"CSVMappingParameters": {
"RecordRowDelimiter": "\n",
"RecordColumnDelimiter": ","
}
}
},
"RecordEncoding": "UTF-8",
"RecordColumns": [
{
"Name": "deviceType",
"Mapping": "deviceType",
"SqlType": "VARCHAR(64)"
},
{
"Name": "name",
"Mapping": "name",
"SqlType": "VARCHAR(64)"
}
]
}
}

Real-time Analyticsのタブには出てくるんだけど、中身を見ようとするとエラーになる。

参考) http://docs.aws.amazon.com/kinesisanalytics/latest/dev/app-add-reference-data.html

PUMPとか

1
2
3
4
5
6
7
8
9
10

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (deviceModel VARCHAR(128), deviceModelCount INTEGER);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
deviceModel,
COUNT(*) AS deviceModelCount
FROM "SOURCE_SQL_STREAM_001"
GROUP BY deviceModel, FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

これで参照できない。
どうやら参照する時に内部的に大文字に変換していて、スキーマ定義の列が大文字ではないので、そんな列はないぞと言われる。
いくつかのパターンを試してみましたが、スキーマ定義は大文字必須のようだ。

追記: 2016/08/14
サポートの人から教えて貰いまして、ダブルクォートで括らないと大文字に変換されるそうです。ダブルクォートで括れば、スキーマ定義が小文字のままでも参照できます。
http://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-identifiers.html

追記: 2016/09/04
何も操作できなかった問題は、GUIで色々と弄っていたらIAM Role(のポリシー)のPermissionがおかしくなってました。
しかも1つのApplicationに複数のポリシーが紐付いていて、その状態だと画面を開く度にポリシーのバージョンが更新されていってしまう。。
これはもうGUIの不具合という事で片付けて、深追いしないことにしました。

メモ

CURRENT_ROW_TIMESTAMP
W3C_LOG_PARSE

RANDOM_CUT_FOREST
http://jmlr.org/proceedings/papers/v48/guha16.pdf

IDFAが起動する度に変わる件について

  • このエントリーをはてなブックマークに追加

iOSアプリのテストをしていて、起動時にIDFAが毎回変わっている気がする。
これではターゲティングが出来ない😨 という事で、調べてみたところ、以下のページを見つけました。

http://dev.tapjoy.com/faq/testflight-idfa-and-testing-tapjoy/
http://stackoverflow.com/questions/26648840/ios-testflight-beta-app-get-new-advertising-identifier-in-each-run

Namely, each time a Testflight-distributed app asks for the the Identifier for Advertisers (IDFA), it will get a different IDFA.

TestFlightの場合、IDFAを取得する度に異なるIDが返ってくると.. まじかよっ😨
アプリでは起動時にIDFAを取得してキャッシュしていたので、起動する度に変わっていたように見えていただけなのね。

Stream Processing Casual Talksで発表しました

  • このエントリーをはてなブックマークに追加

こちらの勉強会です。
http://connpass.com/event/35264/

参加者がガチ勢ばっかりだったので、マジびびる..
途中で海外のMeetup方式になってマジびびる…

スライドはここに置いておきます。
http://www.slideshare.net/tamrin69/pipelinedb

streamのpostgres上の実装とか、continuous viewはviewなので実体が別に_mrelテーブル(とシーケンステーブル)が存在するとか言いたかったのですが、時間が無くなってしまい殆ど駆け足でした。
知りたい事があれば個別に聞いてください。
また、弊社のランチを食べながら話をしたいという方もTwitterでコンタクトくださいませ。

一部の人にはStream Processingよりも重複排除の方が引っかかったようで、発表後や懇親会ではそういうお話が出来て楽しかったです。

個人的には、予測モデルの生成のような難しい事をするのであればそれに適したミドルウェアを使えば良いと思いますが、単純にKPIを素早く出したいだけだったらPipelineDBが優れているなと思いました。
(そもそも同じ土俵で戦う製品では無いし)

主催者の方、および会場提供のY!さんありがとうございました。
ほんとガチな人しか居なかったし、発表もガチな内容しか無かったので楽しかったです。

Lavernaの開発環境を整える

  • このエントリーをはてなブックマークに追加

LavernaはElectronベースのOSSです。
web-sequence図やdot形式の図が描画できたら良いなぁと思って、ちょっと開発環境を整えてみます。

READMEにある通りですが、

1
2
3
4
5
6
7
8
9
npm install -g bower
npm install -g gulp

git clone git@github.com:Laverna/laverna.git
cd laverna

npm install
bower install
(cd test && bower install)

みたいな感じです。

起動する時は、以下のように起動します。

1
electron .

この場合はブラウザが立ち上がりますが、アプリとして起動する場合は、以下のように--noServerを付けます。

1
electron . --noServer

一番最初に --noServerを付けて起動するとアプリの中身が何も表示されなかったので、最初は普通に起動する必要があるっぽいです。
開発モード(Inspectorとか表示される)をONにして起動する場合は、--devをつけます。

1
electron . --noServer --dev

修正したソースが反映されない

Macの場合、~/Library/Application Support/laverna/にキャッシュが保存されているので、これを消します。
Localeを編集した時、一向に反映される気配が無くて、インストールし直してもビルドし直しても駄目で、これが原因でした。

Evernoteのかわりにlavernaを使ってみる

  • このエントリーをはてなブックマークに追加

Evernote、最近雲行きが怪しい。Premium契約をしているけど、一向にテンプレートの機能が追加されないし、別のに乗り換えようかなと思っています。
その前はOneNote使っていたのですが、これはこれでちょっとアレなんですよね。

今は、Quiverというアプリを使っていて、Mac専用。。
Quiverの良いところは、1つのページにブロックの概念があって、MarkdownやCode等を貼り付けられること。Sequence図も書ける。ただMac専用..

欲しい機能としては、

  • Mac, Windows, iPhone(Webでも可)で動く
  • クラウド保存(Dropbox連携とかでも可)
  • Markdownで書ける
  • 画像を貼り付ける事ができる

そこで見つけたのがlavernaというアプリ。
Linux, Windows, Macで動く上にブラウザでも使える。しかもOSS。
スマフォ版アプリは無いけどAndroid版が出来るらしい。
Web版はDropboxと連携する事でドキュメントを保存する事ができます。

ただ、Webで貼り付けた画像がMacでは読み込めません。逆もまたしかり。
画像はDropboxに保存されるわけではないっぽい。残念..

他にはsimplenoteというものがあるらしい。

Java8のラムダメソッドにAnnotationを定義したい

  • このエントリーをはてなブックマークに追加

日本語で説明が難しい。のでやりたい事をコードで。

1
2
3
4
5
6
7
exec_service.submit(new Callable<String>(){
@Trace // <- これ!!
@Override
public String call() {
System.out.println("do something");
}
});

これをLambdaで書くと、

1
2
3
exec_service.submit(()->{
System.out.println("do something");
});

こうなるわけだけど、ここに@Traceを設定するにはどーすれば良いか。

結論としては無理っぽい。(´・ω・`)しょぼーん

http://stackoverflow.com/questions/23968736/annotated-lambda-expression-instead-of-interface-with-single-apply-method/23983479#23983479
http://stackoverflow.com/questions/22375891/annotating-the-functional-interface-of-a-lambda-expression

GoogleDriveのAPIでハマった

  • このエントリーをはてなブックマークに追加

SlackにUploadされたファイルをGoogleDriveに自動で保存していくコードをRubyで書いていたのですが以下の現象が発生していました。
ちなみにZapier等を使わなかったのは、何かの必要な情報が取れなかったからです。(昔の事なので忘れた..)

現象: Googleドライブにアップロードしたファイルをブラウザからダウンロードした時、ファイル名に謎の拡張子がついてしまう。

例えば、sample.pngという名前でアップロードしているのにsample.png.fileという名前でダウンロードされます。

原因は以下のようなコードでした。
child_title=sample.png, local_file=tmp.file, mime=image/png としていました。
中身のコードを追っていないので適当な推測ですが、titleはファイル名にはなってくれますが、ダウンロードした時の拡張子はそのようになるわけではないっぽいです。
request_schemaの設定かmediaの設定あたり、何か足りていないのかしら..?(´・ω・`)

https://developers.google.com/drive/v2/reference/files/insert#request-body
によると、originalFilenameかしら?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
require 'google/api_client'
require 'google_drive'

def make_gd_file(parent_id, child_title, local_file, mime)
file = @drive.files.insert.request_schema.new({
'title' => child_title,
'mimeType' => mime
})
file.parents = [{'id' => parent_id}]
media = Google::APIClient::UploadIO.new(local_file, mime)
result = @client.execute(
:api_method => @drive.files.insert,
:body_object => file,
:media => media,
:parameters => {
'uploadType' => 'multipart',
'alt' => 'json'
}
)
if result.status == 200
return result.data
else
puts "An error occurred(make_gd_file): #{result.data['error']['message']}"
return nil
end
end

JavaのEnumにおける初期化の順番

  • このエントリーをはてなブックマークに追加

この前ハマったので、備忘録メモ。
http://stackoverflow.com/questions/6435267/java-enum-static-final-instance-variables

EnumをEnum名(.name())とは別の特定のキーから逆引きできるコードを書きたくて、以下のように書いてみました。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.util.HashMap;
import java.util.Map;

public enum MyEnum {
ATTR1("aliasA", "aliasB"),
ATTR2("aliasC")
;
private static final Map<String, MyEnum> lookupMap = new HashMap<>();
private MyEnum(String... args) {
for (String s: args) {
lookupMap.put(s, this); // コンストラクタでstatic変数を参照
}
}
public static MyEnum lookup(String s) {
return lookupMap.get(s);
}
}

すると、こんなエラーが出ました。

Cannot refer to the static enum field MyEnum.lookupMap within an initializer

コンストラクタ内でstatic変数を参照しているのがNGのようです。
通常のクラスの場合、staticイニシャライザが最初に実行されるのですが、
Enumの場合は、staticイニシャライザよりもinstanceのコンストラクタの方が先に実行されるようです。

というわけで、こう書きます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.HashMap;
import java.util.Map;

public enum MyEnum {
ATTR1("aliasA", "aliasB"),
ATTR2("aliasC")
;
private static final Map<String, MyEnum> lookupMap = new HashMap<>();
private final String[] aliases;
static {
for (MyEnum e : values()) { // 全てのEnum要素を列挙して
for (String alias: e.aliases) { // 各Enumのインスタンス変数を参照し
lookupMap.put(alias, e); // Lookupテーブルに格納する
}
}
}
private MyEnum(String... args) {
this.aliases = args; // インスタンス変数に待避
}
public static MyEnum lookup(String s) {
return lookupMap.get(s);
}
}

もしくは、InnerClassでHolderクラスを作ります。こっちの方がスマートですね。
この場合、Holder#static{}が一番最初に呼び出されるので、MyEnumコンストラクタから直接参照できるわけです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.HashMap;
import java.util.Map;

public enum MyEnum {
ATTR1("aliasA", "aliasB"),
ATTR2("aliasC")
;
private static final class Holder {
private static final Map<String, MyEnum> lookupMap = new HashMap<>();
}
private MyEnum(String... args) {
for (String alias : args) {
Holder.lookupMap.put(alias, this);
}
}
public static MyEnum lookup(String s) {
return lookupMap.get(s);
}
}

初期化順序確認コード

Enumと普通のクラスの初期化順を確認するコードです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class InitOrderTest {
public static enum MyEnum {
A, B, C;
static {
System.out.println("enum-static-init");
}
private MyEnum() {
System.out.println("enum-constructor");
}
{
System.out.println("enum-init");
}
}
public static class MyClass {
static {
System.out.println("class-static-init");
}
MyClass() {
System.out.println("class-constructor");
}
{
System.out.println("class-init");
}
}
public static void main(String[] args) {
System.out.println("main");
MyEnum.values();
new MyClass();
}
}

classの場合は、static -> {} -> ()という順番に対して、Enumの場合は、{} -> () -> staticという順番になっています。

1
2
3
4
5
6
7
8
9
10
11
main
enum-init
enum-constructor
enum-init
enum-constructor
enum-init
enum-constructor
enum-static-init
class-static-init
class-init
class-constructor

PipelineDBでCV定義をする時に気をつけること(時間関数編)とContinuous Transform

  • このエントリーをはてなブックマークに追加

CV(Continuous View)を定義する時に、その瞬間の時間を表すものは、arrival_timestampclock_timestamp()が使えます。
now()等も時間を返しますが、この関数はCV定義に使う事はできません。

現在の日付と時刻(現在のトランザクションの開始時)
https://www.postgresql.jp/document/9.5/html/functions-datetime.html

とあります。通常のSQLではこれでも問題ないのですが、CVはStreamに対してずっとTransactionをかけている事になっているので、実質的にPipelineDBの起動時間やCVを定義した時間が返ってくる事になります。
なのでCVでは使う事ができません。

また、click_timestamp()は1つしか定義できません。複数定義すると以下のエラーが発生します。

1
2
3
4
5
6
7
8
9
10
11
12
CREATE CONTINUOUS VIEW test2 AS
SELECT
to_char(to_timestamp((item->>'eventTimestamp')::bigint + 3600*9), 'YYYY-MM-DD HH24:MI:00') as jst,
(item->>'platform')::text as platform,
(item->>'action')::text as action,
count(*) as cnt
FROM sn_action_stream
WHERE
minute(arrival_timestamp) > clock_timestamp() - interval '3 days'
AND to_timestamp((item->>'t')::bigint) > clock_timestamp() - interval '3 days'
GROUP BY
jst, platform, action;
1
ERROR: clock_timestamp() may only appear once in a WHERE clause

そもそも、なぜこのようにしているかと言うと、直近3日間に到着したログだけを対象にしたいが、ログは遅延して送られてくる事が多く(数日遅れて来る場合もある..)レコードの日付も3日以内である事をチェックしたい場合などです。
その場合、レコードの時間だけを対象にすれば良いのですが、それだとPipelineDBのレコードとしてのLifetimeを設定できないのです。
WITH (max_age)使えば良いのですが。

どうしても複数使いたい場合は、サブクエリで回避できます。

1
2
3
4
5
6
7
8
9
10
11
CREATE CONTINUOUS VIEW test2 AS
SELECT
to_char(to_timestamp((item->>'eventTimestamp')::bigint + 3600*9), 'YYYY-MM-DD HH24:MI:00') as jst,
(item->>'platform')::text as platform,
(item->>'action')::text as action,
count(*) as cnt
FROM (SELECT arrival_timestamp, item FROM sn_action_stream WHERE to_timestamp((item->>'eventTimestamp')::bigint) > clock_timestamp() - interval '3 days') _
WHERE
minute(arrival_timestamp) > clock_timestamp() - interval '3 days'
GROUP BY
jst, platform, action;

ただし、こんな事をしなくても Continuous Transform を使うともっと良い感じにできます。
Continuous Transformとは、StreamからCVを定義し、その結果を別のStreamへ流す事ができます。つまりフィルタとしての振る舞いをします。

1
2
3
4
5
6
7
8
9
10
CREATE STREAM strm_test1 (item JSONB);
CREATE STREAM strm_test2 (item JSONB);

CREATE CONTINUOUS TRANSFORM test_etl AS
SELECT item::jsonb
FROM strm_test1
WHERE
to_timestamp((item->>'eventTimestamp')::bigint) > clock_timestamp() - interval '3 days'
THEN EXECUTE PROCEDURE pipeline_stream_insert('strm_test2')
;

このように定義して、綺麗になったtest2を参照するCVを定義します。

1
2
3
4
5
6
7
8
9
10
11
CREATE CONTINUOUS VIEW test2 AS
SELECT
to_char(to_timestamp((item->>'eventTimestamp')::bigint + 3600*9), 'YYYY-MM-DD HH24:MI:00') as jst,
(item->>'platform')::text as platform,
(item->>'action')::text as action,
count(*) as cnt
FROM strm_test2
WHERE
minute(arrival_timestamp) > clock_timestamp() - interval '3 days'
GROUP BY
jst, platform, action;

pipeline_stream_insertはv0.9.3で用意された関数ですが、これを使わなくても独自で定義する事ができます。
話がだんだん逸れていきますが、Continuous Transformを使えばStreamのコピーをする事もできます。
それはまた別の記事で。