mirror of
https://github.com/netsampler/goflow2.git
synced 2024-05-06 15:54:52 +00:00
147 lines
3.3 KiB
Bash
Executable File
147 lines
3.3 KiB
Bash
Executable File
#!/bin/bash
|
|
set -e
|
|
|
|
clickhouse client -n <<-EOSQL
|
|
|
|
CREATE DATABASE IF NOT EXISTS dictionaries;
|
|
|
|
CREATE DICTIONARY IF NOT EXISTS dictionaries.protocols (
|
|
proto UInt8,
|
|
name String,
|
|
description String
|
|
)
|
|
PRIMARY KEY proto
|
|
LAYOUT(FLAT())
|
|
SOURCE (FILE(path '/var/lib/clickhouse/user_files/protocols.csv' format 'CSVWithNames'))
|
|
LIFETIME(3600);
|
|
|
|
CREATE TABLE IF NOT EXISTS flows
|
|
(
|
|
time_received_ns UInt64,
|
|
time_flow_start_ns UInt64,
|
|
|
|
sequence_num UInt32,
|
|
sampling_rate UInt64,
|
|
sampler_address FixedString(16),
|
|
|
|
src_addr FixedString(16),
|
|
dst_addr FixedString(16),
|
|
|
|
src_as UInt32,
|
|
dst_as UInt32,
|
|
|
|
etype UInt32,
|
|
proto UInt32,
|
|
|
|
src_port UInt32,
|
|
dst_port UInt32,
|
|
|
|
bytes UInt64,
|
|
packets UInt64
|
|
) ENGINE = Kafka()
|
|
SETTINGS
|
|
kafka_broker_list = 'kafka:9092',
|
|
kafka_topic_list = 'flows',
|
|
kafka_group_name = 'clickhouse',
|
|
kafka_format = 'Protobuf',
|
|
kafka_schema = 'flow.proto:FlowMessage';
|
|
|
|
CREATE TABLE IF NOT EXISTS flows_raw
|
|
(
|
|
date Date,
|
|
time_inserted_ns DateTime64(9),
|
|
time_received_ns DateTime64(9),
|
|
time_flow_start_ns DateTime64(9),
|
|
|
|
sequence_num UInt32,
|
|
sampling_rate UInt64,
|
|
sampler_address FixedString(16),
|
|
|
|
src_addr FixedString(16),
|
|
dst_addr FixedString(16),
|
|
|
|
src_as UInt32,
|
|
dst_as UInt32,
|
|
|
|
etype UInt32,
|
|
proto UInt32,
|
|
|
|
src_port UInt32,
|
|
dst_port UInt32,
|
|
|
|
bytes UInt64,
|
|
packets UInt64
|
|
) ENGINE = MergeTree()
|
|
PARTITION BY date
|
|
ORDER BY time_received_ns;
|
|
|
|
CREATE MATERIALIZED VIEW IF NOT EXISTS flows_raw_view TO flows_raw
|
|
AS SELECT
|
|
toDate(time_received_ns) AS date,
|
|
now() AS time_inserted_ns,
|
|
toDateTime64(time_received_ns/1000000000, 9) AS time_received_ns,
|
|
toDateTime64(time_flow_start_ns/1000000000, 9) AS time_flow_start_ns,
|
|
sequence_num,
|
|
sampling_rate,
|
|
sampler_address,
|
|
|
|
src_addr,
|
|
dst_addr,
|
|
|
|
src_as,
|
|
dst_as,
|
|
|
|
etype,
|
|
proto,
|
|
|
|
src_port,
|
|
dst_port,
|
|
|
|
bytes,
|
|
packets
|
|
FROM flows;
|
|
|
|
CREATE TABLE IF NOT EXISTS flows_5m
|
|
(
|
|
date Date,
|
|
timeslot DateTime,
|
|
|
|
src_as UInt32,
|
|
dst_as UInt32,
|
|
|
|
etypeMap Nested (
|
|
etype UInt32,
|
|
bytes UInt64,
|
|
packets UInt64,
|
|
count UInt64
|
|
),
|
|
|
|
bytes UInt64,
|
|
packets UInt64,
|
|
count UInt64
|
|
) ENGINE = SummingMergeTree()
|
|
PARTITION BY date
|
|
ORDER BY (date, timeslot, src_as, dst_as, \`etypeMap.etype\`);
|
|
|
|
CREATE MATERIALIZED VIEW IF NOT EXISTS flows_5m_view TO flows_5m
|
|
AS
|
|
SELECT
|
|
date,
|
|
toStartOfFiveMinute(time_received_ns) AS timeslot,
|
|
src_as,
|
|
dst_as,
|
|
|
|
[etype] AS \`etypeMap.etype\`,
|
|
[bytes] AS \`etypeMap.bytes\`,
|
|
[packets] AS \`etypeMap.packets\`,
|
|
[count] AS \`etypeMap.count\`,
|
|
|
|
sum(bytes) AS bytes,
|
|
sum(packets) AS packets,
|
|
count() AS count
|
|
|
|
FROM flows_raw
|
|
GROUP BY date, timeslot, src_as, dst_as, \`etypeMap.etype\`;
|
|
|
|
EOSQL
|