This is an old revision of the document!
switch sflow -> goflow2 -> kafka -> clickhouse
примеры конфигов для свитчей
sampling.1G = 8192 sampling.10G = 8192 sampling.40G = 16384 sampling.100G = 32768
file /etc/default/goflow2
GOFLOW2_ARGS=-format pb -format.protobuf.fixedlen=true -listen=sflow://172.17.172.111:6343 -reuseport=true -transport=kafka -transport.kafka.brokers=172.17.172.111:9092 -transport.kafka.hashing=true -transport.kafka.log.err=true -transport.kafka.topic=sflow -reuseport=true -workers=2
useradd kafka -m passwd kafka su -l kafka cd curl https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz -O kafka_2.13-2.8.0.tgz tar -xvzf kafka_2.13-2.8.0.tgz ln -s /home/kafka/kafka_2.13-2.8.0 /home/kafka/kafka
/home/kafka/kafka/config/server.properties
delete.topic.enable=true
/etc/systemd/system/zookeeper.service
[Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
/etc/systemd/system/kafka.service
[Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties> /home/kafka/kafka/kafka.log 2>&1' ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
Запускаем сервисы
systemctl daemon-reload systemctl start kafka systemctl enable kafka
Работаем с kafka
/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list /home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe "test-1" /home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test-1 /home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --config compression.type=producer --config cleanup.policy=delete --topic sflow --partitions 10 --replication-factor 1
https://github.com/netsampler/goflow2/blob/main/pb/flow.proto
https://clickhouse.tech/docs/ru/engines/table-engines/integrations/kafka/
https://github.com/AlexeyKupershtokh/clickhouse-maxmind-geoip/
https://gist.github.com/sanchezzzhak/511fd140e8809857f8f1d84ddb937015
CREATE TABLE traffic.kafka
(
`TimeReceived` UInt64,
`SequenceNum` UInt64,
`SamplingRate` UInt64,
`FlowDirection` UInt32,
`SamplerAddress` FixedString(16),
`TimeFlowStart` UInt64,
`TimeFlowEnd` UInt64,
`Bytes` UInt64,
`Packets` UInt64,
`SrcAddr` FixedString(16),
`DstAddr` FixedString(16),
`EType` UInt32,
`Proto` UInt32,
`SrcPort` UInt32,
`DstPort` UInt32,
`InIf` UInt32,
`OutIf` UInt32,
`SrcMac` String,
`DstMac` String,
`SrcVlan` UInt32,
`DstVlan` UInt32,
`VlanId` UInt32,
`IngressVrfID` UInt32,
`EgressVrfID` UInt32,
`IPTos` UInt32,
`ForwardingStatus` UInt32,
`IPTTL` UInt32,
`TCPFlags` UInt32,
`IcmpType` UInt32,
`IcmpCode` UInt32,
`IPv6FlowLabel` UInt32,
`FragmentId` UInt32,
`FragmentOffset` UInt32,
`BiFlowDirection` UInt32,
`SrcAs` UInt32,
`DstAs` UInt32,
`NextHop` FixedString(16),
`NextHopAS` UInt32,
`SrcNet` UInt32,
`DstNet` UInt32,
`HasMPLS` UInt8,
`MPLSCount` UInt32,
`MPLS1TTL` UInt32,
`MPLS1Label` UInt32,
`MPLS2TTL` UInt32,
`MPLS2Label` UInt32,
`MPLS3TTL` UInt32,
`MPLS3Label` UInt32,
`MPLSLastTTL` UInt32,
`MPLSLastLabel` UInt32
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = '172.17.172.111:9092',
kafka_topic_list = 'sflow',
kafka_group_name = 'clickhouse',
kafka_format = 'Protobuf',
kafka_schema = 'flow.proto:FlowMessage',
kafka_num_consumers = 6
CREATE TABLE traffic.raw
(
`Date` Date,
`Datetime` DateTime,
`SequenceNum` UInt64,
`SamplingRate` UInt64,
`FlowDirection` UInt32,
`SamplerAddress` String,
`TimeFlowStart` DateTime,
`TimeFlowEnd` DateTime,
`Bytes` UInt64,
`Packets` UInt64,
`SrcIp` String,
`DstIp` String,
`EType` UInt32,
`Proto` String,
`SrcPort` UInt32,
`DstPort` UInt32,
`InIf` UInt32,
`OutIf` UInt32,
`SrcMac` String,
`DstMac` String,
`SrcVlan` UInt32,
`DstVlan` UInt32,
`VlanId` UInt32,
`IngressVrfID` UInt32,
`EgressVrfID` UInt32,
`IPTos` UInt32,
`ForwardingStatus` UInt32,
`IPTTL` UInt32,
`TCPFlags` UInt32,
`IcmpType` UInt32,
`IcmpCode` UInt32,
`IPv6FlowLabel` UInt32,
`FragmentId` UInt32,
`FragmentOffset` UInt32,
`BiFlowDirection` UInt32,
`SrcGeo` UInt32,
`DstGeo` UInt32,
`SrcAs` UInt32,
`DstAs` UInt32,
`SrcCountry` String,
`DstCountry` String,
`NextHop` String,
`NextHopAS` UInt32,
`SrcNet` String,
`DstNet` String,
`HasMPLS` UInt8,
`MPLSCount` UInt32,
`MPLS1TTL` UInt32,
`MPLS1Label` UInt32,
`MPLS2TTL` UInt32,
`MPLS2Label` UInt32,
`MPLS3TTL` UInt32,
`MPLS3Label` UInt32,
`MPLSLastTTL` UInt32,
`MPLSLastLabel` UInt32)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(Date)
ORDER BY Datetime
TTL Date + toIntervalDay(7)
SETTINGS index_granularity = 8192
CREATE MATERIALIZED VIEW traffic.kafka_to_raw TO traffic.raw
(
`Date` Date,
`Datetime` DateTime,
`SequenceNum` UInt64,
`SamplingRate` UInt64,
`FlowDirection` UInt32,
`SamplerAddress` String,
`TimeFlowStart` DateTime,
`TimeFlowEnd` DateTime,
`Bytes` UInt64,
`Packets` UInt64,
`SrcIp` String,
`DstIp` String,
`EType` UInt32,
`Proto` String,
`SrcPort` UInt32,
`DstPort` UInt32,
`InIf` UInt32,
`OutIf` UInt32,
`SrcMac` String,
`DstMac` String,
`SrcVlan` UInt32,
`DstVlan` UInt32,
`VlanId` UInt32,
`IngressVrfID` UInt32,
`EgressVrfID` UInt32,
`IPTos` UInt32,
`ForwardingStatus` UInt32,
`IPTTL` UInt32,
`TCPFlags` UInt32,
`IcmpType` UInt32,
`IcmpCode` UInt32,
`IPv6FlowLabel` UInt32,
`FragmentId` UInt32,
`FragmentOffset` UInt32,
`BiFlowDirection` UInt32,
`SrcAs` UInt32,
`DstAs` UInt32,
`SrcCountry` String,
`DstCountry` String,
`NextHop` String,
`NextHopAS` UInt32,
`SrcNet` String,
`DstNet` String,
`HasMPLS` UInt8,
`MPLSCount` UInt32,
`MPLS1TTL` UInt32,
`MPLS1Label` UInt32,
`MPLS2TTL` UInt32,
`MPLS2Label` UInt32,
`MPLS3TTL` UInt32,
`MPLS3Label` UInt32,
`MPLSLastTTL` UInt32,
`MPLSLastLabel` UInt32
) AS
SELECT
toDate(TimeReceived) AS Date,
TimeReceived as Datetime,
IPv4NumToString(reinterpretAsUInt32(substring(reverse(SamplerAddress), 13, 4))) AS SamplerAddress,
Bytes * SamplingRate as Bytes,
Packets * SamplingRate as Packets,
multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))),
EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(SrcAddr)), 0) AS SrcAs,
multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))),
EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(DstAddr)), 0) AS DstAs,
multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))),
EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(SrcAddr)), 0) AS SrcGeo,
multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))),
EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(DstAddr)), 0) AS DstGeo,
multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4))),
EType = 0x86DD, IPv6NumToString(SrcAddr), '') AS SrcIp,
multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4))),
EType = 0x86DD, IPv6NumToString(DstAddr), '') AS DstIp,
dictGet('dictionaries.protocols', 'name', toUInt64(Proto)) AS Proto,
MACNumToString(toUInt64(SrcMac)) AS SrcMac,
MACNumToString(toUInt64(DstMac)) AS DstMac,
dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(SrcGeo)) AS SrcCountry,
dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(DstGeo)) AS DstCountry,
SequenceNum,
SamplingRate,
FlowDirection,
TimeFlowStart,
TimeFlowEnd,
EType,
SrcPort,
DstPort,
InIf,
OutIf,
SrcVlan,
DstVlan,
VlanId,
IngressVrfID,
EgressVrfID,
IPTos,
ForwardingStatus,
IPTTL,
TCPFlags,
IcmpType,
IcmpCode,
IPv6FlowLabel,
FragmentId,
FragmentOffset,
BiFlowDirection,
NextHop,
NextHopAS,
SrcNet,
DstNet,
HasMPLS,
MPLSCount,
MPLS1TTL,
MPLS1Label,
MPLS2TTL,
MPLS2Label,
MPLS3TTL,
MPLS3Label,
MPLSLastTTL,
MPLSLastLabel
FROM traffic.kafka
protocols - словарь для текстового представления протоколов
CREATE DICTIONARY dictionaries.protocols (
`proto` UInt8,
`name` String,
`description` String
)
PRIMARY KEY proto
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/protocols.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(FLAT())
asnv4 - конвертация ipv4 к ASN
CREATE DICTIONARY dictionaries.asnv4
(
`network` String,
`asn` UInt32,
`aso` String DEFAULT '??'
)
PRIMARY KEY network
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/ip-asn-ipv4.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(IP_TRIE())
asnv6 - конвертация ipv6 к ASN
CREATE DICTIONARY dictionaries.asnv6
(
`network` String,
`asn` UInt32,
`aso` String DEFAULT '??'
)
PRIMARY KEY network
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/ip-asn-ipv6.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(IP_TRIE())
contryen
CREATE DICTIONARY dictionaries.contryen
(
`geoname_id` id,
`locale_code` String,
`continent_code` String,
`continent_name` String,
`country_iso_code` String,
`country_name` String,
`is_in_european_union` UInt8
)
PRIMARY KEY geoname_id
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/coutry-location.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(HASHED())
contryv4
CREATE DICTIONARY dictionaries.contryv4
(
`network` String,
`geoname_id` UInt32 DEFAULT 0,
`registered_country_geoname_id` UInt32 DEFAULT 0,
`represented_country_geoname_id` UInt32 DEFAULT 0,
`is_anonymous_proxy` UInt8 DEFAULT 0,
`is_satellite_provider` UInt8 DEFAULT 0
)
PRIMARY KEY network
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/country-ipv4.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(IP_TRIE())
contryv6
CREATE DICTIONARY dictionaries.contryv6
(
`network` String,
`geoname_id` UInt32 DEFAULT 0,
`registered_country_geoname_id` UInt32 DEFAULT 0,
`represented_country_geoname_id` UInt32 DEFAULT 0,
`is_anonymous_proxy` UInt8 DEFAULT 0,
`is_satellite_provider` UInt8 DEFAULT 0
)
PRIMARY KEY network
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/country-ipv6.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(IP_TRIE())
switches
CREATE DICTIONARY dictionaries.switches
(
`ip` String,
`name` String
)
PRIMARY KEY ip
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/switches.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(COMPLEX_KEY_HASHED())