1
0
mirror of https://github.com/netsampler/goflow2.git synced 2024-05-06 15:54:52 +00:00

147 lines
3.3 KiB
Bash
Raw Permalink Normal View History

2021-05-22 16:12:26 -07:00
#!/bin/bash
set -e
clickhouse client -n <<-EOSQL
CREATE DATABASE IF NOT EXISTS dictionaries;
CREATE DICTIONARY IF NOT EXISTS dictionaries.protocols (
2021-05-22 16:12:26 -07:00
proto UInt8,
name String,
description String
)
PRIMARY KEY proto
LAYOUT(FLAT())
SOURCE (FILE(path '/var/lib/clickhouse/user_files/protocols.csv' format 'CSVWithNames'))
LIFETIME(3600);
CREATE TABLE IF NOT EXISTS flows
(
2023-08-09 19:47:20 -07:00
time_received_ns UInt64,
time_flow_start_ns UInt64,
2021-05-22 16:12:26 -07:00
sequence_num UInt32,
sampling_rate UInt64,
sampler_address FixedString(16),
2021-05-22 16:12:26 -07:00
src_addr FixedString(16),
dst_addr FixedString(16),
2021-05-22 16:12:26 -07:00
src_as UInt32,
dst_as UInt32,
2021-05-22 16:12:26 -07:00
etype UInt32,
proto UInt32,
2021-05-22 16:12:26 -07:00
src_port UInt32,
dst_port UInt32,
2021-05-22 16:12:26 -07:00
bytes UInt64,
packets UInt64
2021-05-22 16:12:26 -07:00
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'flows',
kafka_group_name = 'clickhouse',
kafka_format = 'Protobuf',
kafka_schema = 'flow.proto:FlowMessage';
2021-05-22 16:12:26 -07:00
CREATE TABLE IF NOT EXISTS flows_raw
(
date Date,
2023-08-09 19:47:20 -07:00
time_inserted_ns DateTime64(9),
time_received_ns DateTime64(9),
time_flow_start_ns DateTime64(9),
2021-05-22 16:12:26 -07:00
sequence_num UInt32,
sampling_rate UInt64,
sampler_address FixedString(16),
2021-05-22 16:12:26 -07:00
src_addr FixedString(16),
dst_addr FixedString(16),
2021-05-22 16:12:26 -07:00
src_as UInt32,
dst_as UInt32,
2021-05-22 16:12:26 -07:00
etype UInt32,
proto UInt32,
2021-05-22 16:12:26 -07:00
src_port UInt32,
dst_port UInt32,
2021-05-22 16:12:26 -07:00
bytes UInt64,
packets UInt64
2021-05-22 16:12:26 -07:00
) ENGINE = MergeTree()
PARTITION BY date
2023-08-09 19:47:20 -07:00
ORDER BY time_received_ns;
2021-05-22 16:12:26 -07:00
CREATE MATERIALIZED VIEW IF NOT EXISTS flows_raw_view TO flows_raw
2021-05-22 16:12:26 -07:00
AS SELECT
2023-08-09 19:47:20 -07:00
toDate(time_received_ns) AS date,
now() AS time_inserted_ns,
toDateTime64(time_received_ns/1000000000, 9) AS time_received_ns,
toDateTime64(time_flow_start_ns/1000000000, 9) AS time_flow_start_ns,
sequence_num,
sampling_rate,
sampler_address,
src_addr,
dst_addr,
src_as,
dst_as,
etype,
proto,
src_port,
dst_port,
bytes,
packets
2021-05-22 16:12:26 -07:00
FROM flows;
CREATE TABLE IF NOT EXISTS flows_5m
(
date Date,
timeslot DateTime,
2021-05-22 16:12:26 -07:00
src_as UInt32,
dst_as UInt32,
2021-05-22 16:12:26 -07:00
etypeMap Nested (
etype UInt32,
bytes UInt64,
packets UInt64,
count UInt64
2021-05-22 16:12:26 -07:00
),
bytes UInt64,
packets UInt64,
count UInt64
2021-05-22 16:12:26 -07:00
) ENGINE = SummingMergeTree()
PARTITION BY date
ORDER BY (date, timeslot, src_as, dst_as, \`etypeMap.etype\`);
2021-05-22 16:12:26 -07:00
CREATE MATERIALIZED VIEW IF NOT EXISTS flows_5m_view TO flows_5m
2021-05-22 16:12:26 -07:00
AS
SELECT
date,
2023-08-09 19:47:20 -07:00
toStartOfFiveMinute(time_received_ns) AS timeslot,
src_as,
dst_as,
2021-05-22 16:12:26 -07:00
[etype] AS \`etypeMap.etype\`,
[bytes] AS \`etypeMap.bytes\`,
[packets] AS \`etypeMap.packets\`,
[count] AS \`etypeMap.count\`,
2021-05-22 16:12:26 -07:00
sum(bytes) AS bytes,
sum(packets) AS packets,
count() AS count
2021-05-22 16:12:26 -07:00
FROM flows_raw
GROUP BY date, timeslot, src_as, dst_as, \`etypeMap.etype\`;
EOSQL