===== Общая схема мониторинга =====
switch sflow -> goflow2 -> kafka -> clickhouse
- switch отправляет данные sflow на UDP порт 6343
- goflow2 принимает поток данных, разбирает данные и сформировав protobuf отправляет в kafka
- kafka просто получает данные, данные в ней хранятся на протяжении 12 часов
- clickhouse
- kafka - таблица подключена на чтение в kafka, данные читаются только 1 раз
- kafka_to_raw - представление которое выбирает данные с kafka и добавляя нужные поля вставляет в raw
- raw - таблица с остаточными сырыми данными
==== switch ====
примеры конфигов для свитчей
sampling.1G = 8192
sampling.10G = 8192
sampling.40G = 16384
sampling.100G = 32768
==== goflow2 ====
file /etc/default/goflow2
GOFLOW2_ARGS=-format pb -format.protobuf.fixedlen=true -listen=sflow://172.17.172.111:6343 -reuseport=true -transport=kafka -transport.kafka.brokers=172.17.172.111:9092 -transport.kafka.hashing=true -transport.kafka.log.err=true -transport.kafka.topic=sflow -reuseport=true -workers=2
==== kafka ====
== Установка ==
useradd kafka -m
passwd kafka
su -l kafka
cd
curl https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz -O kafka_2.13-2.8.0.tgz
tar -xvzf kafka_2.13-2.8.0.tgz
ln -s /home/kafka/kafka_2.13-2.8.0 /home/kafka/kafka
/home/kafka/kafka/config/server.properties
delete.topic.enable=true
/etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
/etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties> /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Запускаем сервисы
systemctl daemon-reload
systemctl start kafka
systemctl enable kafka
Работаем с kafka
/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list
/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe "test-1"
/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test-1
/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --config compression.type=producer --config cleanup.policy=delete --topic sflow --partitions 10 --replication-factor 1
==== clickhouse ====
=== дополнительная информация ===
[[https://github.com/netsampler/goflow2/blob/main/pb/flow.proto|https://github.com/netsampler/goflow2/blob/main/pb/flow.proto]] \\
[[https://clickhouse.tech/docs/ru/engines/table-engines/integrations/kafka/|https://clickhouse.tech/docs/ru/engines/table-engines/integrations/kafka/]] \\
[[https://github.com/AlexeyKupershtokh/clickhouse-maxmind-geoip/|https://github.com/AlexeyKupershtokh/clickhouse-maxmind-geoip/]] \\
[[https://gist.github.com/sanchezzzhak/511fd140e8809857f8f1d84ddb937015|https://gist.github.com/sanchezzzhak/511fd140e8809857f8f1d84ddb937015]]
=== таблица kafka ===
CREATE TABLE traffic.kafka
(
`TimeReceived` UInt64,
`SequenceNum` UInt64,
`SamplingRate` UInt64,
`FlowDirection` UInt32,
`SamplerAddress` FixedString(16),
`TimeFlowStart` UInt64,
`TimeFlowEnd` UInt64,
`Bytes` UInt64,
`Packets` UInt64,
`SrcAddr` FixedString(16),
`DstAddr` FixedString(16),
`EType` UInt32,
`Proto` UInt32,
`SrcPort` UInt32,
`DstPort` UInt32,
`InIf` UInt32,
`OutIf` UInt32,
`SrcMac` String,
`DstMac` String,
`SrcVlan` UInt32,
`DstVlan` UInt32,
`VlanId` UInt32,
`IngressVrfID` UInt32,
`EgressVrfID` UInt32,
`IPTos` UInt32,
`ForwardingStatus` UInt32,
`IPTTL` UInt32,
`TCPFlags` UInt32,
`IcmpType` UInt32,
`IcmpCode` UInt32,
`IPv6FlowLabel` UInt32,
`FragmentId` UInt32,
`FragmentOffset` UInt32,
`BiFlowDirection` UInt32,
`SrcAs` UInt32,
`DstAs` UInt32,
`NextHop` FixedString(16),
`NextHopAS` UInt32,
`SrcNet` UInt32,
`DstNet` UInt32,
`HasMPLS` UInt8,
`MPLSCount` UInt32,
`MPLS1TTL` UInt32,
`MPLS1Label` UInt32,
`MPLS2TTL` UInt32,
`MPLS2Label` UInt32,
`MPLS3TTL` UInt32,
`MPLS3Label` UInt32,
`MPLSLastTTL` UInt32,
`MPLSLastLabel` UInt32
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = '172.17.172.111:9092',
kafka_topic_list = 'sflow',
kafka_group_name = 'clickhouse',
kafka_format = 'Protobuf',
kafka_schema = 'flow.proto:FlowMessage',
kafka_num_consumers = 6
==== таблица raw ====
CREATE TABLE traffic.raw
(
`Date` Date,
`Datetime` DateTime,
`SequenceNum` UInt64,
`SamplingRate` UInt64,
`FlowDirection` UInt32,
`SamplerAddress` String,
`TimeFlowStart` DateTime,
`TimeFlowEnd` DateTime,
`Bytes` UInt64,
`Packets` UInt64,
`SrcIp` String,
`DstIp` String,
`EType` UInt32,
`Proto` String,
`SrcPort` UInt32,
`DstPort` UInt32,
`InIf` UInt32,
`OutIf` UInt32,
`SrcMac` String,
`DstMac` String,
`SrcVlan` UInt32,
`DstVlan` UInt32,
`VlanId` UInt32,
`IngressVrfID` UInt32,
`EgressVrfID` UInt32,
`IPTos` UInt32,
`ForwardingStatus` UInt32,
`IPTTL` UInt32,
`TCPFlags` UInt32,
`IcmpType` UInt32,
`IcmpCode` UInt32,
`IPv6FlowLabel` UInt32,
`FragmentId` UInt32,
`FragmentOffset` UInt32,
`BiFlowDirection` UInt32,
`SrcGeo` UInt32,
`DstGeo` UInt32,
`SrcAs` UInt32,
`DstAs` UInt32,
`SrcCountry` String,
`DstCountry` String,
`NextHop` String,
`NextHopAS` UInt32,
`SrcNet` String,
`DstNet` String,
`HasMPLS` UInt8,
`MPLSCount` UInt32,
`MPLS1TTL` UInt32,
`MPLS1Label` UInt32,
`MPLS2TTL` UInt32,
`MPLS2Label` UInt32,
`MPLS3TTL` UInt32,
`MPLS3Label` UInt32,
`MPLSLastTTL` UInt32,
`MPLSLastLabel` UInt32)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(Date)
ORDER BY Datetime
TTL Date toIntervalDay(7)
SETTINGS index_granularity = 8192
==== таблица kafka_to_raw ====
CREATE MATERIALIZED VIEW traffic.kafka_to_raw TO traffic.raw
(
`Date` Date,
`Datetime` DateTime,
`SequenceNum` UInt64,
`SamplingRate` UInt64,
`FlowDirection` UInt32,
`SamplerAddress` String,
`TimeFlowStart` DateTime,
`TimeFlowEnd` DateTime,
`Bytes` UInt64,
`Packets` UInt64,
`SrcIp` String,
`DstIp` String,
`EType` UInt32,
`Proto` String,
`SrcPort` UInt32,
`DstPort` UInt32,
`InIf` UInt32,
`OutIf` UInt32,
`SrcMac` String,
`DstMac` String,
`SrcVlan` UInt32,
`DstVlan` UInt32,
`VlanId` UInt32,
`IngressVrfID` UInt32,
`EgressVrfID` UInt32,
`IPTos` UInt32,
`ForwardingStatus` UInt32,
`IPTTL` UInt32,
`TCPFlags` UInt32,
`IcmpType` UInt32,
`IcmpCode` UInt32,
`IPv6FlowLabel` UInt32,
`FragmentId` UInt32,
`FragmentOffset` UInt32,
`BiFlowDirection` UInt32,
`SrcAs` UInt32,
`DstAs` UInt32,
`SrcCountry` String,
`DstCountry` String,
`NextHop` String,
`NextHopAS` UInt32,
`SrcNet` String,
`DstNet` String,
`HasMPLS` UInt8,
`MPLSCount` UInt32,
`MPLS1TTL` UInt32,
`MPLS1Label` UInt32,
`MPLS2TTL` UInt32,
`MPLS2Label` UInt32,
`MPLS3TTL` UInt32,
`MPLS3Label` UInt32,
`MPLSLastTTL` UInt32,
`MPLSLastLabel` UInt32
) AS
SELECT
toDate(TimeReceived) AS Date,
TimeReceived as Datetime,
IPv4NumToString(reinterpretAsUInt32(substring(reverse(SamplerAddress), 13, 4))) AS SamplerAddress,
Bytes * SamplingRate as Bytes,
Packets * SamplingRate as Packets,
multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))),
EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(SrcAddr)), 0) AS SrcAs,
multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))),
EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(DstAddr)), 0) AS DstAs,
multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))),
EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(SrcAddr)), 0) AS SrcGeo,
multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))),
EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(DstAddr)), 0) AS DstGeo,
multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4))),
EType = 0x86DD, IPv6NumToString(SrcAddr), '') AS SrcIp,
multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4))),
EType = 0x86DD, IPv6NumToString(DstAddr), '') AS DstIp,
dictGet('dictionaries.protocols', 'name', toUInt64(Proto)) AS Proto,
MACNumToString(toUInt64(SrcMac)) AS SrcMac,
MACNumToString(toUInt64(DstMac)) AS DstMac,
dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(SrcGeo)) AS SrcCountry,
dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(DstGeo)) AS DstCountry,
SequenceNum,
SamplingRate,
FlowDirection,
TimeFlowStart,
TimeFlowEnd,
EType,
SrcPort,
DstPort,
InIf,
OutIf,
SrcVlan,
DstVlan,
VlanId,
IngressVrfID,
EgressVrfID,
IPTos,
ForwardingStatus,
IPTTL,
TCPFlags,
IcmpType,
IcmpCode,
IPv6FlowLabel,
FragmentId,
FragmentOffset,
BiFlowDirection,
NextHop,
NextHopAS,
SrcNet,
DstNet,
HasMPLS,
MPLSCount,
MPLS1TTL,
MPLS1Label,
MPLS2TTL,
MPLS2Label,
MPLS3TTL,
MPLS3Label,
MPLSLastTTL,
MPLSLastLabel
FROM traffic.kafka
==== Словари ====
protocols - словарь для текстового представления протоколов
CREATE DICTIONARY dictionaries.protocols (
`proto` UInt8,
`name` String,
`description` String
)
PRIMARY KEY proto
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/protocols.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(FLAT())
asnv4 - конвертация ipv4 к ASN
CREATE DICTIONARY dictionaries.asnv4
(
`network` String,
`asn` UInt32,
`aso` String DEFAULT '??'
)
PRIMARY KEY network
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/ip-asn-ipv4.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(IP_TRIE())
asnv6 - конвертация ipv6 к ASN
CREATE DICTIONARY dictionaries.asnv6
(
`network` String,
`asn` UInt32,
`aso` String DEFAULT '??'
)
PRIMARY KEY network
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/ip-asn-ipv6.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(IP_TRIE())
contryen
CREATE DICTIONARY dictionaries.contryen
(
`geoname_id` id,
`locale_code` String,
`continent_code` String,
`continent_name` String,
`country_iso_code` String,
`country_name` String,
`is_in_european_union` UInt8
)
PRIMARY KEY geoname_id
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/coutry-location.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(HASHED())
contryv4
CREATE DICTIONARY dictionaries.contryv4
(
`network` String,
`geoname_id` UInt32 DEFAULT 0,
`registered_country_geoname_id` UInt32 DEFAULT 0,
`represented_country_geoname_id` UInt32 DEFAULT 0,
`is_anonymous_proxy` UInt8 DEFAULT 0,
`is_satellite_provider` UInt8 DEFAULT 0
)
PRIMARY KEY network
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/country-ipv4.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(IP_TRIE())
contryv6
CREATE DICTIONARY dictionaries.contryv6
(
`network` String,
`geoname_id` UInt32 DEFAULT 0,
`registered_country_geoname_id` UInt32 DEFAULT 0,
`represented_country_geoname_id` UInt32 DEFAULT 0,
`is_anonymous_proxy` UInt8 DEFAULT 0,
`is_satellite_provider` UInt8 DEFAULT 0
)
PRIMARY KEY network
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/country-ipv6.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(IP_TRIE())
switches
CREATE DICTIONARY dictionaries.switches
(
`ip` String,
`name` String
)
PRIMARY KEY ip
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/switches.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(COMPLEX_KEY_HASHED())