User Tools

Site Tools


giganet:sflow

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
giganet:sflow [2021/07/21 14:30]
rb
giganet:sflow [2021/08/03 13:28] (current)
rb
Line 1: Line 1:
-====== Схема мониторинга sflow статистики ======+===== Общая схема мониторинга =====
  
-==== Задействованные сервера ====+<code> 
 +switch sflow -> goflow2 -> kafka -> clickhouse
  
-  * **goflow **- сервер на котором крутится kafka и goflow2 приложение по сбору статистики +</code>
-  * **clickhouse **- сервер с базой данных clickhouse, хранение и обработка статистики +
-==== Задействованные приложения ====+
  
-  * **goflow2** +  - switch отправляет данные sflow на UDP порт 6343 
-  * **kafka** +  - goflow2 принимает поток данных, разбирает данные и сформировав protobuf отправляет в kafka 
-  * **clickhouse** +  kafka просто получает данные, данные в ней хранятся на протяжении 12 часов 
-==== Общая схема мониторинга ====+  clickhouse 
 +      - kafka - таблица подключена на чтение в kafka, данные читаются только 1 раз 
 +      - kafka_to_raw - представление которое выбирает данные с kafka и добавляя нужные поля вставляет в raw 
 +      - raw - таблица с остаточными сырыми данными 
 + 
 +==== switch ==== 
 + 
 +примеры конфигов для свитчей 
 + 
 +<code> 
 +sampling.1G = 8192 
 +sampling.10G = 8192 
 +sampling.40G = 16384 
 +sampling.100G = 32768 
 + 
 +</code> 
 + 
 +==== goflow2 ==== 
 + 
 +file /etc/default/goflow2 
 + 
 +<code> 
 +GOFLOW2_ARGS=-format pb -format.protobuf.fixedlen=true -listen=sflow://172.17.172.111:6343 -reuseport=true -transport=kafka -transport.kafka.brokers=172.17.172.111:9092 -transport.kafka.hashing=true -transport.kafka.log.err=true -transport.kafka.topic=sflow -reuseport=true -workers=2 
 + 
 +</code> 
 + 
 +==== kafka ==== 
 + 
 +== Установка == 
 + 
 +<code> 
 +useradd kafka -m 
 +passwd kafka 
 +su -l kafka 
 +cd 
 +curl https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz -O kafka_2.13-2.8.0.tgz 
 +tar -xvzf kafka_2.13-2.8.0.tgz 
 +ln -s  /home/kafka/kafka_2.13-2.8.0 /home/kafka/kafka 
 + 
 +</code> 
 + 
 +/home/kafka/kafka/config/server.properties 
 + 
 +<code> 
 +delete.topic.enable=true 
 + 
 +</code> 
 + 
 +/etc/systemd/system/zookeeper.service 
 + 
 +<code> 
 +[Unit] 
 +Requires=network.target remote-fs.target 
 +After=network.target remote-fs.target 
 + 
 +[Service] 
 +Type=simple 
 +User=kafka 
 +ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties 
 +ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh 
 +Restart=on-abnormal 
 + 
 +[Install] 
 +WantedBy=multi-user.target 
 + 
 +</code> 
 + 
 +/etc/systemd/system/kafka.service 
 + 
 +<code> 
 +[Unit] 
 +Requires=zookeeper.service 
 +After=zookeeper.service 
 + 
 +[Service] 
 +Type=simple 
 +User=kafka 
 +ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties> /home/kafka/kafka/kafka.log 2>&1' 
 +ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh 
 +Restart=on-abnormal 
 + 
 +[Install] 
 +WantedBy=multi-user.target 
 + 
 +</code> 
 + 
 +Запускаем сервисы 
 + 
 +<code> 
 +systemctl daemon-reload 
 +systemctl start kafka 
 +systemctl enable kafka 
 + 
 +</code> 
 + 
 +Работаем с kafka 
 + 
 +<code> 
 +/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list 
 +/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe "test-1" 
 +/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test-1 
 +/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --config compression.type=producer --config cleanup.policy=delete --topic sflow --partitions 10 --replication-factor 1 
 + 
 +</code> 
 + 
 +==== clickhouse ==== 
 + 
 +=== дополнительная информация === 
 + 
 +[[https://github.com/netsampler/goflow2/blob/main/pb/flow.proto|https://github.com/netsampler/goflow2/blob/main/pb/flow.proto]] \\ 
 +[[https://clickhouse.tech/docs/ru/engines/table-engines/integrations/kafka/|https://clickhouse.tech/docs/ru/engines/table-engines/integrations/kafka/]] \\ 
 +[[https://github.com/AlexeyKupershtokh/clickhouse-maxmind-geoip/|https://github.com/AlexeyKupershtokh/clickhouse-maxmind-geoip/]] \\ 
 +[[https://gist.github.com/sanchezzzhak/511fd140e8809857f8f1d84ddb937015|https://gist.github.com/sanchezzzhak/511fd140e8809857f8f1d84ddb937015]] 
 + 
 +=== таблица kafka === 
 +<code> 
 + 
 +CREATE TABLE traffic.kafka 
 +
 +    `TimeReceived` UInt64, 
 +    `SequenceNum` UInt64, 
 +    `SamplingRate` UInt64, 
 +    `FlowDirection` UInt32, 
 + 
 +    `SamplerAddress` FixedString(16), 
 + 
 +    `TimeFlowStart` UInt64, 
 +    `TimeFlowEnd` UInt64, 
 + 
 +    `Bytes` UInt64, 
 +    `Packets` UInt64, 
 + 
 +    `SrcAddr` FixedString(16), 
 +    `DstAddr` FixedString(16), 
 + 
 +    `EType` UInt32, 
 +    `Proto` UInt32, 
 + 
 +    `SrcPort` UInt32, 
 +    `DstPort` UInt32, 
 + 
 +    `InIf` UInt32, 
 +    `OutIf` UInt32, 
 + 
 +    `SrcMac` String, 
 +    `DstMac` String, 
 + 
 +    `SrcVlan` UInt32, 
 +    `DstVlan` UInt32, 
 +    `VlanId` UInt32, 
 + 
 +    `IngressVrfID` UInt32, 
 +    `EgressVrfID` UInt32, 
 + 
 +    `IPTos` UInt32, 
 +    `ForwardingStatus` UInt32, 
 +    `IPTTL` UInt32, 
 +    `TCPFlags` UInt32, 
 +    `IcmpType` UInt32, 
 +    `IcmpCode` UInt32, 
 +    `IPv6FlowLabel` UInt32, 
 + 
 +    `FragmentId` UInt32, 
 +    `FragmentOffset` UInt32, 
 +    `BiFlowDirection` UInt32, 
 + 
 +    `SrcAs` UInt32, 
 +    `DstAs` UInt32, 
 + 
 +    `NextHop` FixedString(16), 
 +    `NextHopAS` UInt32, 
 + 
 +    `SrcNet` UInt32, 
 +    `DstNet` UInt32, 
 + 
 +    `HasMPLS` UInt8, 
 +    `MPLSCount` UInt32, 
 +    `MPLS1TTL` UInt32, 
 +    `MPLS1Label` UInt32, 
 +    `MPLS2TTL` UInt32, 
 +    `MPLS2Label` UInt32, 
 +    `MPLS3TTL` UInt32, 
 +    `MPLS3Label` UInt32, 
 +    `MPLSLastTTL` UInt32, 
 +    `MPLSLastLabel` UInt32 
 +
 +ENGINE Kafka 
 +SETTINGS 
 +    kafka_broker_list = '172.17.172.111:9092', 
 +    kafka_topic_list = 'sflow', 
 +    kafka_group_name = 'clickhouse', 
 +    kafka_format = 'Protobuf', 
 +    kafka_schema = 'flow.proto:FlowMessage', 
 +    kafka_num_consumers = 6 
 + 
 +</code> 
 + 
 +==== таблица raw ==== 
 + 
 +<code> 
 +CREATE TABLE traffic.raw 
 +
 +    `Date` Date, 
 +    `Datetime` DateTime, 
 + 
 +    `SequenceNum` UInt64, 
 +    `SamplingRate` UInt64, 
 +    `FlowDirection` UInt32, 
 + 
 +    `SamplerAddress` String, 
 + 
 +    `TimeFlowStart` DateTime, 
 +    `TimeFlowEnd` DateTime, 
 + 
 +    `Bytes` UInt64, 
 +    `Packets` UInt64, 
 + 
 +    `SrcIp` String, 
 +    `DstIp` String, 
 + 
 +    `EType` UInt32, 
 +    `Proto` String, 
 + 
 +    `SrcPort` UInt32, 
 +    `DstPort` UInt32, 
 + 
 +    `InIf` UInt32, 
 +    `OutIf` UInt32, 
 + 
 +    `SrcMac` String, 
 +    `DstMac` String, 
 + 
 +    `SrcVlan` UInt32, 
 +    `DstVlan` UInt32, 
 +    `VlanId` UInt32, 
 + 
 +    `IngressVrfID` UInt32, 
 +    `EgressVrfID` UInt32, 
 + 
 +    `IPTos` UInt32, 
 +    `ForwardingStatus` UInt32, 
 +    `IPTTL` UInt32, 
 +    `TCPFlags` UInt32, 
 +    `IcmpType` UInt32, 
 +    `IcmpCode` UInt32, 
 +    `IPv6FlowLabel` UInt32, 
 + 
 +    `FragmentId` UInt32, 
 +    `FragmentOffset` UInt32, 
 +    `BiFlowDirection` UInt32, 
 + 
 +    `SrcGeo` UInt32, 
 +    `DstGeo` UInt32, 
 +    `SrcAs` UInt32, 
 +    `DstAs` UInt32, 
 + 
 +    `SrcCountry` String, 
 +    `DstCountry` String, 
 + 
 +    `NextHop` String, 
 +    `NextHopAS` UInt32, 
 + 
 +    `SrcNet` String, 
 +    `DstNet` String, 
 + 
 +    `HasMPLS` UInt8, 
 +    `MPLSCount` UInt32, 
 +    `MPLS1TTL` UInt32, 
 +    `MPLS1Label` UInt32, 
 +    `MPLS2TTL` UInt32, 
 +    `MPLS2Label` UInt32, 
 +    `MPLS3TTL` UInt32, 
 +    `MPLS3Label` UInt32, 
 +    `MPLSLastTTL` UInt32, 
 +    `MPLSLastLabel` UInt32) 
 +ENGINE = MergeTree 
 +PARTITION BY toYYYYMMDD(Date) 
 +ORDER BY Datetime 
 +TTL Date   toIntervalDay(7) 
 +SETTINGS index_granularity = 8192 
 + 
 +</code> 
 + 
 +==== таблица kafka_to_raw ==== 
 + 
 +<code> 
 +CREATE MATERIALIZED VIEW traffic.kafka_to_raw TO traffic.raw 
 +
 +    `Date` Date, 
 +    `Datetime` DateTime, 
 + 
 +    `SequenceNum` UInt64, 
 +    `SamplingRate` UInt64, 
 +    `FlowDirection` UInt32, 
 + 
 +    `SamplerAddress` String, 
 + 
 +    `TimeFlowStart` DateTime, 
 +    `TimeFlowEnd` DateTime, 
 + 
 +    `Bytes` UInt64, 
 +    `Packets` UInt64, 
 + 
 +    `SrcIp` String, 
 +    `DstIp` String, 
 + 
 +    `EType` UInt32, 
 +    `Proto` String, 
 + 
 +    `SrcPort` UInt32, 
 +    `DstPort` UInt32, 
 + 
 +    `InIf` UInt32, 
 +    `OutIf` UInt32, 
 + 
 +    `SrcMac` String, 
 +    `DstMac` String, 
 + 
 +    `SrcVlan` UInt32, 
 +    `DstVlan` UInt32, 
 +    `VlanId` UInt32, 
 + 
 +    `IngressVrfID` UInt32, 
 +    `EgressVrfID` UInt32, 
 + 
 +    `IPTos` UInt32, 
 +    `ForwardingStatus` UInt32, 
 +    `IPTTL` UInt32, 
 +    `TCPFlags` UInt32, 
 +    `IcmpType` UInt32, 
 +    `IcmpCode` UInt32, 
 +    `IPv6FlowLabel` UInt32, 
 + 
 +    `FragmentId` UInt32, 
 +    `FragmentOffset` UInt32, 
 +    `BiFlowDirection` UInt32, 
 + 
 +    `SrcAs` UInt32, 
 +    `DstAs` UInt32, 
 + 
 +    `SrcCountry` String, 
 +    `DstCountry` String, 
 + 
 +    `NextHop` String, 
 +    `NextHopAS` UInt32, 
 + 
 +    `SrcNet` String, 
 +    `DstNet` String, 
 + 
 +    `HasMPLS` UInt8, 
 +    `MPLSCount` UInt32, 
 +    `MPLS1TTL` UInt32, 
 +    `MPLS1Label` UInt32, 
 +    `MPLS2TTL` UInt32, 
 +    `MPLS2Label` UInt32, 
 +    `MPLS3TTL` UInt32, 
 +    `MPLS3Label` UInt32, 
 +    `MPLSLastTTL` UInt32, 
 +    `MPLSLastLabel` UInt32 
 +) AS 
 +SELECT 
 +    toDate(TimeReceived) AS Date, 
 +    TimeReceived as Datetime, 
 +    IPv4NumToString(reinterpretAsUInt32(substring(reverse(SamplerAddress), 13, 4))) AS SamplerAddress, 
 +    Bytes * SamplingRate as Bytes, 
 +    Packets * SamplingRate as Packets, 
 +    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))), 
 +            EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(SrcAddr)), 0) AS SrcAs, 
 +    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))), 
 +            EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(DstAddr)), 0) AS DstAs, 
 +    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))), 
 +            EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(SrcAddr)), 0) AS SrcGeo, 
 +    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))), 
 +            EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(DstAddr)), 0) AS DstGeo, 
 +    multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4))), 
 +            EType = 0x86DD, IPv6NumToString(SrcAddr), '') AS SrcIp, 
 +    multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4))), 
 +            EType = 0x86DD, IPv6NumToString(DstAddr), '') AS DstIp, 
 +    dictGet('dictionaries.protocols', 'name', toUInt64(Proto)) AS Proto, 
 +    MACNumToString(toUInt64(SrcMac)) AS SrcMac, 
 +    MACNumToString(toUInt64(DstMac)) AS DstMac, 
 +    dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(SrcGeo)) AS SrcCountry, 
 +    dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(DstGeo)) AS DstCountry, 
 + 
 +    SequenceNum, 
 +    SamplingRate, 
 +    FlowDirection, 
 +    TimeFlowStart, 
 +    TimeFlowEnd, 
 +    EType, 
 +    SrcPort, 
 +    DstPort, 
 +    InIf, 
 +    OutIf, 
 +    SrcVlan, 
 +    DstVlan, 
 +    VlanId, 
 +    IngressVrfID, 
 +    EgressVrfID, 
 +    IPTos, 
 +    ForwardingStatus, 
 +    IPTTL, 
 +    TCPFlags, 
 +    IcmpType, 
 +    IcmpCode, 
 +    IPv6FlowLabel, 
 +    FragmentId, 
 +    FragmentOffset, 
 +    BiFlowDirection, 
 +    NextHop, 
 +    NextHopAS, 
 +    SrcNet, 
 +    DstNet, 
 +    HasMPLS, 
 +    MPLSCount, 
 +    MPLS1TTL, 
 +    MPLS1Label, 
 +    MPLS2TTL, 
 +    MPLS2Label, 
 +    MPLS3TTL, 
 +    MPLS3Label, 
 +    MPLSLastTTL, 
 +    MPLSLastLabel 
 +FROM traffic.kafka 
 + 
 +</code> 
 + 
 +==== Словари ==== 
 + 
 +protocols - словарь для текстового представления протоколов 
 + 
 +<code> 
 +CREATE DICTIONARY dictionaries.protocols ( 
 +    `proto` UInt8, 
 +    `name` String, 
 +    `description` String 
 +
 +PRIMARY KEY proto 
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/protocols.csv' FORMAT 'CSVWithNames')) 
 +LIFETIME(MIN 0 MAX 3600) 
 +LAYOUT(FLAT()) 
 + 
 +</code> 
 + 
 +asnv4 - конвертация ipv4 к ASN 
 + 
 +<code> 
 +CREATE DICTIONARY dictionaries.asnv4 
 +
 +    `network` String, 
 +    `asn` UInt32, 
 +    `aso` String DEFAULT '??' 
 +
 +PRIMARY KEY network 
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/ip-asn-ipv4.csv' FORMAT 'CSVWithNames')) 
 +LIFETIME(MIN 0 MAX 3600) 
 +LAYOUT(IP_TRIE()) 
 + 
 +</code> 
 + 
 +asnv6 - конвертация ipv6 к ASN 
 + 
 +<code> 
 +CREATE DICTIONARY dictionaries.asnv6 
 +
 +    `network` String, 
 +    `asn` UInt32, 
 +    `aso` String DEFAULT '??' 
 +
 +PRIMARY KEY network 
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/ip-asn-ipv6.csv' FORMAT 'CSVWithNames')) 
 +LIFETIME(MIN 0 MAX 3600) 
 +LAYOUT(IP_TRIE()) 
 + 
 +</code> 
 + 
 +contryen 
 + 
 +<code> 
 +CREATE DICTIONARY dictionaries.contryen 
 +
 +    `geoname_id` id, 
 +    `locale_code` String, 
 +    `continent_code` String, 
 +    `continent_name` String, 
 +    `country_iso_code` String, 
 +    `country_name` String, 
 +    `is_in_european_union` UInt8 
 +
 +PRIMARY KEY geoname_id 
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/coutry-location.csv' FORMAT 'CSVWithNames')) 
 +LIFETIME(MIN 0 MAX 3600) 
 +LAYOUT(HASHED()) 
 + 
 +</code> 
 + 
 +contryv4 
 + 
 +<code> 
 +CREATE DICTIONARY dictionaries.contryv4 
 +
 +    `network` String, 
 +    `geoname_id` UInt32 DEFAULT 0, 
 +    `registered_country_geoname_id` UInt32 DEFAULT 0, 
 +    `represented_country_geoname_id` UInt32 DEFAULT 0, 
 +    `is_anonymous_proxy` UInt8 DEFAULT 0, 
 +    `is_satellite_provider` UInt8 DEFAULT 0 
 +
 +PRIMARY KEY network 
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/country-ipv4.csv' FORMAT 'CSVWithNames')) 
 +LIFETIME(MIN 0 MAX 3600) 
 +LAYOUT(IP_TRIE()) 
 + 
 +</code> 
 + 
 +contryv6 
 + 
 +<code> 
 +CREATE DICTIONARY dictionaries.contryv6 
 +
 +    `network` String, 
 +    `geoname_id` UInt32 DEFAULT 0, 
 +    `registered_country_geoname_id` UInt32 DEFAULT 0, 
 +    `represented_country_geoname_id` UInt32 DEFAULT 0, 
 +    `is_anonymous_proxy` UInt8 DEFAULT 0, 
 +    `is_satellite_provider` UInt8 DEFAULT 0 
 +
 +PRIMARY KEY network 
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/country-ipv6.csv' FORMAT 'CSVWithNames')) 
 +LIFETIME(MIN 0 MAX 3600) 
 +LAYOUT(IP_TRIE()) 
 + 
 +</code> 
 + 
 +switches 
 + 
 +<code> 
 +CREATE DICTIONARY dictionaries.switches 
 +
 +    `ip` String, 
 +    `name` String 
 +
 +PRIMARY KEY ip 
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/switches.csv' FORMAT 'CSVWithNames')) 
 +LIFETIME(MIN 0 MAX 3600) 
 +LAYOUT(COMPLEX_KEY_HASHED()) 
 + 
 +</code>
  
  
giganet/sflow.1626867018.txt.gz · Last modified: 2021/07/21 14:30 by rb