Kinesis Analyticsを実データで試して気になったことメモ

  • このエントリーをはてなブックマークに追加

サンプルは普通に動きますが、サンプルを動かしただけでは実際にハマりどころが分からないので、実際のデータを使ってみるよ。
調べながら書いた自分用のメモなので、非常に見にくいと思います。

https://aws.amazon.com/jp/blogs/aws/amazon-kinesis-analytics-process-streaming-data-in-real-time-with-sql/
Kinesis Analyticsというものが発表されまして、Kinesisに対してContinuous Queryを実行できるものです。
端的に言えばNorikraみたいなものです。

PipelineDBにも似ている気はしますが、PipelineDBの最大の特徴である結果を保持しておいてSQLで取り出すという事は出来ないので、個人的には競合にはならなそうですね。
お手軽感で言えばConsumerを既に持っているのであれば、PipelineDBの方に軍配が上がります。(PipelineDBはkinesis connectorも実はあります)
まだ日本リージョンでは使えないので、OregonにKinesis Streamを作り、一部のproductionのデータをそちらにもコピーするようにしました。

気になった事を列挙していきます。

  • Kinesisに流れるデータは、CSVかJSONである必要がある
  • Source Streamを選択すると、実データを元にスキーマを推定して定義を作成する
    • 流れているIdがたまたま小さいと、TINYINTやSMALLINTとして扱われてしまう
  • error_stream という物が存在する
    • 定義したスキーマにそぐわないものは、このstreamに流れる
    • 他にもContinuous QueryでのエラーやRecordのOutOfOrderで流れるらしい
  • スキーマで定義できる型は次の通り
  • スキーマの名前に、
    • hwなどの1文字はNG
      • 属性名が1文字の場合はエラーが出て作成できない.. 😨
      • [a-zA-Z][a-zA-Z0-9_]+ このパターンじゃないと駄目
    • SQL:2008の予約語やKinesisの予約語もNG
      • 列名にtimestmap, rank とか駄目
  • 日本語が文字化ける
    • Raw stream sample ってタブを見ると、きちんと日本語が表示されている
    • スキーマ設定には、Record Formatという項目があって、UTF-8と書かれている(変更はできない模様)
    • Amazon Kinesis Analytics supports only Java single-byte CHARACTER SETs.Documentに書いてある
    • つまりマルチバイト文字駄目って事?
  • Timestamp型として定義する場合、Sourceは文字列, UNIXTIME 等何がOKなのか?
    • UNIXTIMEはNG
    • 文字列だったらOKだった
    • TIMEZONEは常にUTC
  • 定義したスキーマとは別に次の項目が付与される
    • PARTITION_KEY VARCHAR(512)
    • SEQUENCE_NUMBER VARCHAR(128)
    • SHARD_ID VARCHAR(64)
    • APPROXIMATE_ARRIVAL_TIME TIMESTAMP
  • Source Streamへの書き込みを止めていると、Save schema and update samplesのボタンを押してもNo rows in source streamと言われる事がある。
    • 一度通ったスキーマでも、次更新しようとするとNo rows in source streamって言われる。
  • 謎の5.0.1というバージョンが
  • S3のデータをJOIN
    • サンプルにはCompanyNameっていうS3のデータとJOINしているっぽいものがある
    • Consoleから設定はできない。APIを使う必要がある。
      • Amazon Kinesis Analytics console does not support managing reference data sources for your applications. You can use the AWS CLI to add reference data source to your application.
    • S3のデータ、マルチバイト文字列たぶん駄目だと思うけど、まだ試していない。
      • 後述の方法で登録はしてみたものの、エラーが出る。
  • 1個のApplicationにReferenceは1つまで
    • つまり、複数のS3のファイルとJOINする事はできない
  • ポチポチ弄ってると、ApplicationのStateがずっとUpdatingのままになる
    • 20分ほど放置しても、ずっと変わらず。
    • 何も操作できない..😨
  • PUMPのSQLで列名を参照する時にエラーになる
    • Schema定義の時に列名を全て大文字で定義しないと参照できない
      • じゃぁValidationの正規表現、小文字を許すべきではないのでは..😨
    • ダブルクォートで括れば良い

ここまでで、自分のデータでクエリをかけるところまではできた。

が、IAM Roleに権限を追加しても、未だにS3のファイル参照は出来ていない。
ひとまず調査はここまでにして、気になった事をサポートに投げています。

S3のデータのデータを取り込む

awscliを最新の状態にする(試した時は 1.10.56)

s3://bucket*/tamtam-test/kinesis_analytics/devices.csv

deviceType,name
IPHONE,あいぽん
IPOD,あいぽっど
IPAD,あいぱっど
ANDROID,あんどろいど
ANDROID_TABLET,あんどろいどタブレット
# version-idを見る
version_id=$(aws kinesisanalytics describe-application --application-name tamtam-test1 | jq .ApplicationDetail.ApplicationVersionId)

aws kinesisanalytics add-application-reference-data-source \
--application-name tamtam-test1 \
--current-application-version-id ${version_id} \
--reference-data-source "$(cat devices-input.json)"

devices-input.jsonの中身です。一部マスクしてます。
エラーが出て良くわからない場合は、--debugを付けて実行すると良いです。

{
"TableName": "Devices",
"S3ReferenceDataSource": {
"BucketARN": "arn:aws:s3:::***bucket**",
"FileKey": "tamtam-test/kinesis_analytics/devices.csv",
"ReferenceRoleARN": "arn:aws:iam::**account id**:role/service-role/kinesis-analytics-tamtam-test1"
},
"ReferenceSchema": {
"RecordFormat": {
"RecordFormatType": "CSV",
"MappingParameters": {
"CSVMappingParameters": {
"RecordRowDelimiter": "\n",
"RecordColumnDelimiter": ","
}
}
},
"RecordEncoding": "UTF-8",
"RecordColumns": [
{
"Name": "deviceType",
"Mapping": "deviceType",
"SqlType": "VARCHAR(64)"
},
{
"Name": "name",
"Mapping": "name",
"SqlType": "VARCHAR(64)"
}
]
}
}

Real-time Analyticsのタブには出てくるんだけど、中身を見ようとするとエラーになる。

参考) http://docs.aws.amazon.com/kinesisanalytics/latest/dev/app-add-reference-data.html

PUMPとか


CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (deviceModel VARCHAR(128), deviceModelCount INTEGER);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
deviceModel,
COUNT(*) AS deviceModelCount
FROM "SOURCE_SQL_STREAM_001"
GROUP BY deviceModel, FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

これで参照できない。
どうやら参照する時に内部的に大文字に変換していて、スキーマ定義の列が大文字ではないので、そんな列はないぞと言われる。
いくつかのパターンを試してみましたが、スキーマ定義は大文字必須のようだ。

追記: 2016/08/14
サポートの人から教えて貰いまして、ダブルクォートで括らないと大文字に変換されるそうです。ダブルクォートで括れば、スキーマ定義が小文字のままでも参照できます。
http://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-identifiers.html

追記: 2016/09/04
何も操作できなかった問題は、GUIで色々と弄っていたらIAM Role(のポリシー)のPermissionがおかしくなってました。
しかも1つのApplicationに複数のポリシーが紐付いていて、その状態だと画面を開く度にポリシーのバージョンが更新されていってしまう。。
これはもうGUIの不具合という事で片付けて、深追いしないことにしました。

メモ

CURRENT_ROW_TIMESTAMP
W3C_LOG_PARSE

RANDOM_CUT_FOREST
http://jmlr.org/proceedings/papers/v48/guha16.pdf

IDFAが起動する度に変わる件について

  • このエントリーをはてなブックマークに追加

iOSアプリのテストをしていて、起動時にIDFAが毎回変わっている気がする。
これではターゲティングが出来ない😨 という事で、調べてみたところ、以下のページを見つけました。

http://dev.tapjoy.com/faq/testflight-idfa-and-testing-tapjoy/
http://stackoverflow.com/questions/26648840/ios-testflight-beta-app-get-new-advertising-identifier-in-each-run

Namely, each time a Testflight-distributed app asks for the the Identifier for Advertisers (IDFA), it will get a different IDFA.

TestFlightの場合、IDFAを取得する度に異なるIDが返ってくると.. まじかよっ😨
アプリでは起動時にIDFAを取得してキャッシュしていたので、起動する度に変わっていたように見えていただけなのね。

Stream Processing Casual Talksで発表しました

  • このエントリーをはてなブックマークに追加

こちらの勉強会です。
http://connpass.com/event/35264/

参加者がガチ勢ばっかりだったので、マジびびる..
途中で海外のMeetup方式になってマジびびる…

スライドはここに置いておきます。
http://www.slideshare.net/tamrin69/pipelinedb

streamのpostgres上の実装とか、continuous viewはviewなので実体が別に_mrelテーブル(とシーケンステーブル)が存在するとか言いたかったのですが、時間が無くなってしまい殆ど駆け足でした。
知りたい事があれば個別に聞いてください。
また、弊社のランチを食べながら話をしたいという方もTwitterでコンタクトくださいませ。

一部の人にはStream Processingよりも重複排除の方が引っかかったようで、発表後や懇親会ではそういうお話が出来て楽しかったです。

個人的には、予測モデルの生成のような難しい事をするのであればそれに適したミドルウェアを使えば良いと思いますが、単純にKPIを素早く出したいだけだったらPipelineDBが優れているなと思いました。
(そもそも同じ土俵で戦う製品では無いし)

主催者の方、および会場提供のY!さんありがとうございました。
ほんとガチな人しか居なかったし、発表もガチな内容しか無かったので楽しかったです。

Lavernaの開発環境を整える

  • このエントリーをはてなブックマークに追加

LavernaはElectronベースのOSSです。
web-sequence図やdot形式の図が描画できたら良いなぁと思って、ちょっと開発環境を整えてみます。

READMEにある通りですが、

npm install -g bower
npm install -g gulp

git clone git@github.com:Laverna/laverna.git
cd laverna

npm install
bower install
(cd test && bower install)

みたいな感じです。

起動する時は、以下のように起動します。

electron .

この場合はブラウザが立ち上がりますが、アプリとして起動する場合は、以下のように--noServerを付けます。

electron . --noServer

一番最初に --noServerを付けて起動するとアプリの中身が何も表示されなかったので、最初は普通に起動する必要があるっぽいです。
開発モード(Inspectorとか表示される)をONにして起動する場合は、--devをつけます。

electron . --noServer --dev

修正したソースが反映されない

Macの場合、~/Library/Application Support/laverna/にキャッシュが保存されているので、これを消します。
Localeを編集した時、一向に反映される気配が無くて、インストールし直してもビルドし直しても駄目で、これが原因でした。

Evernoteのかわりにlavernaを使ってみる

  • このエントリーをはてなブックマークに追加

Evernote、最近雲行きが怪しい。Premium契約をしているけど、一向にテンプレートの機能が追加されないし、別のに乗り換えようかなと思っています。
その前はOneNote使っていたのですが、これはこれでちょっとアレなんですよね。

今は、Quiverというアプリを使っていて、Mac専用。。
Quiverの良いところは、1つのページにブロックの概念があって、MarkdownやCode等を貼り付けられること。Sequence図も書ける。ただMac専用..

欲しい機能としては、

  • Mac, Windows, iPhone(Webでも可)で動く
  • クラウド保存(Dropbox連携とかでも可)
  • Markdownで書ける
  • 画像を貼り付ける事ができる

そこで見つけたのがlavernaというアプリ。
Linux, Windows, Macで動く上にブラウザでも使える。しかもOSS。
スマフォ版アプリは無いけどAndroid版が出来るらしい。
Web版はDropboxと連携する事でドキュメントを保存する事ができます。

ただ、Webで貼り付けた画像がMacでは読み込めません。逆もまたしかり。
画像はDropboxに保存されるわけではないっぽい。残念..

他にはsimplenoteというものがあるらしい。

Java8のラムダメソッドにAnnotationを定義したい

  • このエントリーをはてなブックマークに追加

日本語で説明が難しい。のでやりたい事をコードで。

exec_service.submit(new Callable<String>(){
@Trace // <- これ!!
@Override
public String call() {
System.out.println("do something");
}
});

これをLambdaで書くと、

exec_service.submit(()->{
System.out.println("do something");
});

こうなるわけだけど、ここに@Traceを設定するにはどーすれば良いか。

結論としては無理っぽい。(´・ω・`)しょぼーん

http://stackoverflow.com/questions/23968736/annotated-lambda-expression-instead-of-interface-with-single-apply-method/23983479#23983479
http://stackoverflow.com/questions/22375891/annotating-the-functional-interface-of-a-lambda-expression

GoogleDriveのAPIでハマった

  • このエントリーをはてなブックマークに追加

SlackにUploadされたファイルをGoogleDriveに自動で保存していくコードをRubyで書いていたのですが以下の現象が発生していました。
ちなみにZapier等を使わなかったのは、何かの必要な情報が取れなかったからです。(昔の事なので忘れた..)

現象: Googleドライブにアップロードしたファイルをブラウザからダウンロードした時、ファイル名に謎の拡張子がついてしまう。

例えば、sample.pngという名前でアップロードしているのにsample.png.fileという名前でダウンロードされます。

原因は以下のようなコードでした。
child_title=sample.png, local_file=tmp.file, mime=image/png としていました。
中身のコードを追っていないので適当な推測ですが、titleはファイル名にはなってくれますが、ダウンロードした時の拡張子はそのようになるわけではないっぽいです。
request_schemaの設定かmediaの設定あたり、何か足りていないのかしら..?(´・ω・`)

https://developers.google.com/drive/v2/reference/files/insert#request-body
によると、originalFilenameかしら?

require 'google/api_client'
require 'google_drive'

def make_gd_file(parent_id, child_title, local_file, mime)
file = @drive.files.insert.request_schema.new({
'title' => child_title,
'mimeType' => mime
})
file.parents = [{'id' => parent_id}]
media = Google::APIClient::UploadIO.new(local_file, mime)
result = @client.execute(
:api_method => @drive.files.insert,
:body_object => file,
:media => media,
:parameters => {
'uploadType' => 'multipart',
'alt' => 'json'
}
)
if result.status == 200
return result.data
else
puts "An error occurred(make_gd_file): #{result.data['error']['message']}"
return nil
end
end

JavaのEnumにおける初期化の順番

  • このエントリーをはてなブックマークに追加

この前ハマったので、備忘録メモ。
http://stackoverflow.com/questions/6435267/java-enum-static-final-instance-variables

EnumをEnum名(.name())とは別の特定のキーから逆引きできるコードを書きたくて、以下のように書いてみました。

import java.util.HashMap;
import java.util.Map;

public enum MyEnum {
ATTR1("aliasA", "aliasB"),
ATTR2("aliasC")
;
private static final Map<String, MyEnum> lookupMap = new HashMap<>();
private MyEnum(String... args) {
for (String s: args) {
lookupMap.put(s, this); // コンストラクタでstatic変数を参照
}
}
public static MyEnum lookup(String s) {
return lookupMap.get(s);
}
}

すると、こんなエラーが出ました。

Cannot refer to the static enum field MyEnum.lookupMap within an initializer

コンストラクタ内でstatic変数を参照しているのがNGのようです。
通常のクラスの場合、staticイニシャライザが最初に実行されるのですが、
Enumの場合は、staticイニシャライザよりもinstanceのコンストラクタの方が先に実行されるようです。

というわけで、こう書きます。

import java.util.HashMap;
import java.util.Map;

public enum MyEnum {
ATTR1("aliasA", "aliasB"),
ATTR2("aliasC")
;
private static final Map<String, MyEnum> lookupMap = new HashMap<>();
private final String[] aliases;
static {
for (MyEnum e : values()) { // 全てのEnum要素を列挙して
for (String alias: e.aliases) { // 各Enumのインスタンス変数を参照し
lookupMap.put(alias, e); // Lookupテーブルに格納する
}
}
}
private MyEnum(String... args) {
this.aliases = args; // インスタンス変数に待避
}
public static MyEnum lookup(String s) {
return lookupMap.get(s);
}
}

もしくは、InnerClassでHolderクラスを作ります。こっちの方がスマートですね。
この場合、Holder#static{}が一番最初に呼び出されるので、MyEnumコンストラクタから直接参照できるわけです。

import java.util.HashMap;
import java.util.Map;

public enum MyEnum {
ATTR1("aliasA", "aliasB"),
ATTR2("aliasC")
;
private static final class Holder {
private static final Map<String, MyEnum> lookupMap = new HashMap<>();
}
private MyEnum(String... args) {
for (String alias : args) {
Holder.lookupMap.put(alias, this);
}
}
public static MyEnum lookup(String s) {
return lookupMap.get(s);
}
}

初期化順序確認コード

Enumと普通のクラスの初期化順を確認するコードです。

public class InitOrderTest {
public static enum MyEnum {
A, B, C;
static {
System.out.println("enum-static-init");
}
private MyEnum() {
System.out.println("enum-constructor");
}
{
System.out.println("enum-init");
}
}
public static class MyClass {
static {
System.out.println("class-static-init");
}
MyClass() {
System.out.println("class-constructor");
}
{
System.out.println("class-init");
}
}
public static void main(String[] args) {
System.out.println("main");
MyEnum.values();
new MyClass();
}
}

classの場合は、static -> {} -> ()という順番に対して、Enumの場合は、{} -> () -> staticという順番になっています。

main
enum-init
enum-constructor
enum-init
enum-constructor
enum-init
enum-constructor
enum-static-init
class-static-init
class-init
class-constructor

PipelineDBでCV定義をする時に気をつけること(時間関数編)とContinuous Transform

  • このエントリーをはてなブックマークに追加

CV(Continuous View)を定義する時に、その瞬間の時間を表すものは、arrival_timestampclock_timestamp()が使えます。
now()等も時間を返しますが、この関数はCV定義に使う事はできません。

現在の日付と時刻(現在のトランザクションの開始時)

https://www.postgresql.jp/document/9.5/html/functions-datetime.html

とあります。通常のSQLではこれでも問題ないのですが、CVはStreamに対してずっとTransactionをかけている事になっているので、実質的にPipelineDBの起動時間やCVを定義した時間が返ってくる事になります。
なのでCVでは使う事ができません。

また、click_timestamp()は1つしか定義できません。複数定義すると以下のエラーが発生します。

CREATE CONTINUOUS VIEW test2 AS
SELECT
to_char(to_timestamp((item->>'eventTimestamp')::bigint + 3600*9), 'YYYY-MM-DD HH24:MI:00') as jst,
(item->>'platform')::text as platform,
(item->>'action')::text as action,
count(*) as cnt
FROM sn_action_stream
WHERE
minute(arrival_timestamp) > clock_timestamp() - interval '3 days'
AND to_timestamp((item->>'t')::bigint) > clock_timestamp() - interval '3 days'
GROUP BY
jst, platform, action;
ERROR: clock_timestamp() may only appear once in a WHERE clause

そもそも、なぜこのようにしているかと言うと、直近3日間に到着したログだけを対象にしたいが、ログは遅延して送られてくる事が多く(数日遅れて来る場合もある..)レコードの日付も3日以内である事をチェックしたい場合などです。
その場合、レコードの時間だけを対象にすれば良いのですが、それだとPipelineDBのレコードとしてのLifetimeを設定できないのです。
WITH (max_age)使えば良いのですが。

どうしても複数使いたい場合は、サブクエリで回避できます。

CREATE CONTINUOUS VIEW test2 AS
SELECT
to_char(to_timestamp((item->>'eventTimestamp')::bigint + 3600*9), 'YYYY-MM-DD HH24:MI:00') as jst,
(item->>'platform')::text as platform,
(item->>'action')::text as action,
count(*) as cnt
FROM (SELECT arrival_timestamp, item FROM sn_action_stream WHERE to_timestamp((item->>'eventTimestamp')::bigint) > clock_timestamp() - interval '3 days') _
WHERE
minute(arrival_timestamp) > clock_timestamp() - interval '3 days'
GROUP BY
jst, platform, action;

ただし、こんな事をしなくても Continuous Transform を使うともっと良い感じにできます。
Continuous Transformとは、StreamからCVを定義し、その結果を別のStreamへ流す事ができます。つまりフィルタとしての振る舞いをします。

CREATE STREAM strm_test1 (item JSONB);
CREATE STREAM strm_test2 (item JSONB);

CREATE CONTINUOUS TRANSFORM test_etl AS
SELECT item::jsonb
FROM strm_test1
WHERE
to_timestamp((item->>'eventTimestamp')::bigint) > clock_timestamp() - interval '3 days'
THEN EXECUTE PROCEDURE pipeline_stream_insert('strm_test2')
;

このように定義して、綺麗になったtest2を参照するCVを定義します。

CREATE CONTINUOUS VIEW test2 AS
SELECT
to_char(to_timestamp((item->>'eventTimestamp')::bigint + 3600*9), 'YYYY-MM-DD HH24:MI:00') as jst,
(item->>'platform')::text as platform,
(item->>'action')::text as action,
count(*) as cnt
FROM strm_test2
WHERE
minute(arrival_timestamp) > clock_timestamp() - interval '3 days'
GROUP BY
jst, platform, action;

pipeline_stream_insertはv0.9.3で用意された関数ですが、これを使わなくても独自で定義する事ができます。
話がだんだん逸れていきますが、Continuous Transformを使えばStreamのコピーをする事もできます。
それはまた別の記事で。

デジカメを買いました

  • このエントリーをはてなブックマークに追加

EOS 80D/EF-S 18-135 IS USMレンズキットを購入しましたよ(o゜▽゜)
練習しよっと\(^O^)/