User Tools

Site Tools


giganet:sflow

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
giganet:sflow [2021/08/02 13:04]
rb
giganet:sflow [2021/08/03 13:28] (current)
rb
Line 1: Line 1:
-====== Схема мониторинга sflow статистики ====== +===== Общая схема мониторинга =====
- +
-==== Задействованные сервера ==== +
- +
-  * **goflow **- сервер на котором крутится kafka и goflow2 приложение по сбору статистики +
-  * **clickhouse **- сервер с базой данных clickhouse, хранение и обработка статистики +
-==== Задействованные приложения ==== +
- +
-  * **goflow2** +
-  * **kafka** +
-  * **clickhouse** +
-==== Общая схема мониторинга ====+
  
 <code> <code>
Line 16: Line 5:
  
 </code> </code>
 +
   - switch отправляет данные sflow на UDP порт 6343   - switch отправляет данные sflow на UDP порт 6343
   - goflow2 принимает поток данных, разбирает данные и сформировав protobuf отправляет в kafka   - goflow2 принимает поток данных, разбирает данные и сформировав protobuf отправляет в kafka
Line 23: Line 13:
       - kafka_to_raw - представление которое выбирает данные с kafka и добавляя нужные поля вставляет в raw       - kafka_to_raw - представление которое выбирает данные с kafka и добавляя нужные поля вставляет в raw
       - raw - таблица с остаточными сырыми данными       - raw - таблица с остаточными сырыми данными
 +
 +==== switch ====
 +
 +примеры конфигов для свитчей
 +
 +<code>
 +sampling.1G = 8192
 +sampling.10G = 8192
 +sampling.40G = 16384
 +sampling.100G = 32768
 +
 +</code>
 +
 +==== goflow2 ====
 +
 +file /etc/default/goflow2
 +
 +<code>
 +GOFLOW2_ARGS=-format pb -format.protobuf.fixedlen=true -listen=sflow://172.17.172.111:6343 -reuseport=true -transport=kafka -transport.kafka.brokers=172.17.172.111:9092 -transport.kafka.hashing=true -transport.kafka.log.err=true -transport.kafka.topic=sflow -reuseport=true -workers=2
 +
 +</code>
 +
 +==== kafka ====
 +
 +== Установка ==
 +
 +<code>
 +useradd kafka -m
 +passwd kafka
 +su -l kafka
 +cd
 +curl https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz -O kafka_2.13-2.8.0.tgz
 +tar -xvzf kafka_2.13-2.8.0.tgz
 +ln -s  /home/kafka/kafka_2.13-2.8.0 /home/kafka/kafka
 +
 +</code>
 +
 +/home/kafka/kafka/config/server.properties
 +
 +<code>
 +delete.topic.enable=true
 +
 +</code>
 +
 +/etc/systemd/system/zookeeper.service
 +
 +<code>
 +[Unit]
 +Requires=network.target remote-fs.target
 +After=network.target remote-fs.target
 +
 +[Service]
 +Type=simple
 +User=kafka
 +ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
 +ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
 +Restart=on-abnormal
 +
 +[Install]
 +WantedBy=multi-user.target
 +
 +</code>
 +
 +/etc/systemd/system/kafka.service
 +
 +<code>
 +[Unit]
 +Requires=zookeeper.service
 +After=zookeeper.service
 +
 +[Service]
 +Type=simple
 +User=kafka
 +ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties> /home/kafka/kafka/kafka.log 2>&1'
 +ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
 +Restart=on-abnormal
 +
 +[Install]
 +WantedBy=multi-user.target
 +
 +</code>
 +
 +Запускаем сервисы
 +
 +<code>
 +systemctl daemon-reload
 +systemctl start kafka
 +systemctl enable kafka
 +
 +</code>
 +
 +Работаем с kafka
 +
 +<code>
 +/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list
 +/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe "test-1"
 +/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test-1
 +/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --config compression.type=producer --config cleanup.policy=delete --topic sflow --partitions 10 --replication-factor 1
 +
 +</code>
 +
 +==== clickhouse ====
 +
 +=== дополнительная информация ===
 +
 +[[https://github.com/netsampler/goflow2/blob/main/pb/flow.proto|https://github.com/netsampler/goflow2/blob/main/pb/flow.proto]] \\
 +[[https://clickhouse.tech/docs/ru/engines/table-engines/integrations/kafka/|https://clickhouse.tech/docs/ru/engines/table-engines/integrations/kafka/]] \\
 +[[https://github.com/AlexeyKupershtokh/clickhouse-maxmind-geoip/|https://github.com/AlexeyKupershtokh/clickhouse-maxmind-geoip/]] \\
 +[[https://gist.github.com/sanchezzzhak/511fd140e8809857f8f1d84ddb937015|https://gist.github.com/sanchezzzhak/511fd140e8809857f8f1d84ddb937015]]
 +
 +=== таблица kafka ===
 +<code>
 +
 +CREATE TABLE traffic.kafka
 +(
 +    `TimeReceived` UInt64,
 +    `SequenceNum` UInt64,
 +    `SamplingRate` UInt64,
 +    `FlowDirection` UInt32,
 +
 +    `SamplerAddress` FixedString(16),
 +
 +    `TimeFlowStart` UInt64,
 +    `TimeFlowEnd` UInt64,
 +
 +    `Bytes` UInt64,
 +    `Packets` UInt64,
 +
 +    `SrcAddr` FixedString(16),
 +    `DstAddr` FixedString(16),
 +
 +    `EType` UInt32,
 +    `Proto` UInt32,
 +
 +    `SrcPort` UInt32,
 +    `DstPort` UInt32,
 +
 +    `InIf` UInt32,
 +    `OutIf` UInt32,
 +
 +    `SrcMac` String,
 +    `DstMac` String,
 +
 +    `SrcVlan` UInt32,
 +    `DstVlan` UInt32,
 +    `VlanId` UInt32,
 +
 +    `IngressVrfID` UInt32,
 +    `EgressVrfID` UInt32,
 +
 +    `IPTos` UInt32,
 +    `ForwardingStatus` UInt32,
 +    `IPTTL` UInt32,
 +    `TCPFlags` UInt32,
 +    `IcmpType` UInt32,
 +    `IcmpCode` UInt32,
 +    `IPv6FlowLabel` UInt32,
 +
 +    `FragmentId` UInt32,
 +    `FragmentOffset` UInt32,
 +    `BiFlowDirection` UInt32,
 +
 +    `SrcAs` UInt32,
 +    `DstAs` UInt32,
 +
 +    `NextHop` FixedString(16),
 +    `NextHopAS` UInt32,
 +
 +    `SrcNet` UInt32,
 +    `DstNet` UInt32,
 +
 +    `HasMPLS` UInt8,
 +    `MPLSCount` UInt32,
 +    `MPLS1TTL` UInt32,
 +    `MPLS1Label` UInt32,
 +    `MPLS2TTL` UInt32,
 +    `MPLS2Label` UInt32,
 +    `MPLS3TTL` UInt32,
 +    `MPLS3Label` UInt32,
 +    `MPLSLastTTL` UInt32,
 +    `MPLSLastLabel` UInt32
 +)
 +ENGINE = Kafka
 +SETTINGS
 +    kafka_broker_list = '172.17.172.111:9092',
 +    kafka_topic_list = 'sflow',
 +    kafka_group_name = 'clickhouse',
 +    kafka_format = 'Protobuf',
 +    kafka_schema = 'flow.proto:FlowMessage',
 +    kafka_num_consumers = 6
 +
 +</code>
 +
 +==== таблица raw ====
 +
 +<code>
 +CREATE TABLE traffic.raw
 +(
 +    `Date` Date,
 +    `Datetime` DateTime,
 +
 +    `SequenceNum` UInt64,
 +    `SamplingRate` UInt64,
 +    `FlowDirection` UInt32,
 +
 +    `SamplerAddress` String,
 +
 +    `TimeFlowStart` DateTime,
 +    `TimeFlowEnd` DateTime,
 +
 +    `Bytes` UInt64,
 +    `Packets` UInt64,
 +
 +    `SrcIp` String,
 +    `DstIp` String,
 +
 +    `EType` UInt32,
 +    `Proto` String,
 +
 +    `SrcPort` UInt32,
 +    `DstPort` UInt32,
 +
 +    `InIf` UInt32,
 +    `OutIf` UInt32,
 +
 +    `SrcMac` String,
 +    `DstMac` String,
 +
 +    `SrcVlan` UInt32,
 +    `DstVlan` UInt32,
 +    `VlanId` UInt32,
 +
 +    `IngressVrfID` UInt32,
 +    `EgressVrfID` UInt32,
 +
 +    `IPTos` UInt32,
 +    `ForwardingStatus` UInt32,
 +    `IPTTL` UInt32,
 +    `TCPFlags` UInt32,
 +    `IcmpType` UInt32,
 +    `IcmpCode` UInt32,
 +    `IPv6FlowLabel` UInt32,
 +
 +    `FragmentId` UInt32,
 +    `FragmentOffset` UInt32,
 +    `BiFlowDirection` UInt32,
 +
 +    `SrcGeo` UInt32,
 +    `DstGeo` UInt32,
 +    `SrcAs` UInt32,
 +    `DstAs` UInt32,
 +
 +    `SrcCountry` String,
 +    `DstCountry` String,
 +
 +    `NextHop` String,
 +    `NextHopAS` UInt32,
 +
 +    `SrcNet` String,
 +    `DstNet` String,
 +
 +    `HasMPLS` UInt8,
 +    `MPLSCount` UInt32,
 +    `MPLS1TTL` UInt32,
 +    `MPLS1Label` UInt32,
 +    `MPLS2TTL` UInt32,
 +    `MPLS2Label` UInt32,
 +    `MPLS3TTL` UInt32,
 +    `MPLS3Label` UInt32,
 +    `MPLSLastTTL` UInt32,
 +    `MPLSLastLabel` UInt32)
 +ENGINE = MergeTree
 +PARTITION BY toYYYYMMDD(Date)
 +ORDER BY Datetime
 +TTL Date   toIntervalDay(7)
 +SETTINGS index_granularity = 8192
 +
 +</code>
 +
 +==== таблица kafka_to_raw ====
 +
 +<code>
 +CREATE MATERIALIZED VIEW traffic.kafka_to_raw TO traffic.raw
 +(
 +    `Date` Date,
 +    `Datetime` DateTime,
 +
 +    `SequenceNum` UInt64,
 +    `SamplingRate` UInt64,
 +    `FlowDirection` UInt32,
 +
 +    `SamplerAddress` String,
 +
 +    `TimeFlowStart` DateTime,
 +    `TimeFlowEnd` DateTime,
 +
 +    `Bytes` UInt64,
 +    `Packets` UInt64,
 +
 +    `SrcIp` String,
 +    `DstIp` String,
 +
 +    `EType` UInt32,
 +    `Proto` String,
 +
 +    `SrcPort` UInt32,
 +    `DstPort` UInt32,
 +
 +    `InIf` UInt32,
 +    `OutIf` UInt32,
 +
 +    `SrcMac` String,
 +    `DstMac` String,
 +
 +    `SrcVlan` UInt32,
 +    `DstVlan` UInt32,
 +    `VlanId` UInt32,
 +
 +    `IngressVrfID` UInt32,
 +    `EgressVrfID` UInt32,
 +
 +    `IPTos` UInt32,
 +    `ForwardingStatus` UInt32,
 +    `IPTTL` UInt32,
 +    `TCPFlags` UInt32,
 +    `IcmpType` UInt32,
 +    `IcmpCode` UInt32,
 +    `IPv6FlowLabel` UInt32,
 +
 +    `FragmentId` UInt32,
 +    `FragmentOffset` UInt32,
 +    `BiFlowDirection` UInt32,
 +
 +    `SrcAs` UInt32,
 +    `DstAs` UInt32,
 +
 +    `SrcCountry` String,
 +    `DstCountry` String,
 +
 +    `NextHop` String,
 +    `NextHopAS` UInt32,
 +
 +    `SrcNet` String,
 +    `DstNet` String,
 +
 +    `HasMPLS` UInt8,
 +    `MPLSCount` UInt32,
 +    `MPLS1TTL` UInt32,
 +    `MPLS1Label` UInt32,
 +    `MPLS2TTL` UInt32,
 +    `MPLS2Label` UInt32,
 +    `MPLS3TTL` UInt32,
 +    `MPLS3Label` UInt32,
 +    `MPLSLastTTL` UInt32,
 +    `MPLSLastLabel` UInt32
 +) AS
 +SELECT
 +    toDate(TimeReceived) AS Date,
 +    TimeReceived as Datetime,
 +    IPv4NumToString(reinterpretAsUInt32(substring(reverse(SamplerAddress), 13, 4))) AS SamplerAddress,
 +    Bytes * SamplingRate as Bytes,
 +    Packets * SamplingRate as Packets,
 +    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))),
 +            EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(SrcAddr)), 0) AS SrcAs,
 +    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))),
 +            EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(DstAddr)), 0) AS DstAs,
 +    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))),
 +            EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(SrcAddr)), 0) AS SrcGeo,
 +    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))),
 +            EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(DstAddr)), 0) AS DstGeo,
 +    multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4))),
 +            EType = 0x86DD, IPv6NumToString(SrcAddr), '') AS SrcIp,
 +    multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4))),
 +            EType = 0x86DD, IPv6NumToString(DstAddr), '') AS DstIp,
 +    dictGet('dictionaries.protocols', 'name', toUInt64(Proto)) AS Proto,
 +    MACNumToString(toUInt64(SrcMac)) AS SrcMac,
 +    MACNumToString(toUInt64(DstMac)) AS DstMac,
 +    dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(SrcGeo)) AS SrcCountry,
 +    dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(DstGeo)) AS DstCountry,
 +
 +    SequenceNum,
 +    SamplingRate,
 +    FlowDirection,
 +    TimeFlowStart,
 +    TimeFlowEnd,
 +    EType,
 +    SrcPort,
 +    DstPort,
 +    InIf,
 +    OutIf,
 +    SrcVlan,
 +    DstVlan,
 +    VlanId,
 +    IngressVrfID,
 +    EgressVrfID,
 +    IPTos,
 +    ForwardingStatus,
 +    IPTTL,
 +    TCPFlags,
 +    IcmpType,
 +    IcmpCode,
 +    IPv6FlowLabel,
 +    FragmentId,
 +    FragmentOffset,
 +    BiFlowDirection,
 +    NextHop,
 +    NextHopAS,
 +    SrcNet,
 +    DstNet,
 +    HasMPLS,
 +    MPLSCount,
 +    MPLS1TTL,
 +    MPLS1Label,
 +    MPLS2TTL,
 +    MPLS2Label,
 +    MPLS3TTL,
 +    MPLS3Label,
 +    MPLSLastTTL,
 +    MPLSLastLabel
 +FROM traffic.kafka
 +
 +</code>
 +
 +==== Словари ====
 +
 +protocols - словарь для текстового представления протоколов
 +
 +<code>
 +CREATE DICTIONARY dictionaries.protocols (
 +    `proto` UInt8,
 +    `name` String,
 +    `description` String
 +)
 +PRIMARY KEY proto
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/protocols.csv' FORMAT 'CSVWithNames'))
 +LIFETIME(MIN 0 MAX 3600)
 +LAYOUT(FLAT())
 +
 +</code>
 +
 +asnv4 - конвертация ipv4 к ASN
 +
 +<code>
 +CREATE DICTIONARY dictionaries.asnv4
 +(
 +    `network` String,
 +    `asn` UInt32,
 +    `aso` String DEFAULT '??'
 +)
 +PRIMARY KEY network
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/ip-asn-ipv4.csv' FORMAT 'CSVWithNames'))
 +LIFETIME(MIN 0 MAX 3600)
 +LAYOUT(IP_TRIE())
 +
 +</code>
 +
 +asnv6 - конвертация ipv6 к ASN
 +
 +<code>
 +CREATE DICTIONARY dictionaries.asnv6
 +(
 +    `network` String,
 +    `asn` UInt32,
 +    `aso` String DEFAULT '??'
 +)
 +PRIMARY KEY network
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/ip-asn-ipv6.csv' FORMAT 'CSVWithNames'))
 +LIFETIME(MIN 0 MAX 3600)
 +LAYOUT(IP_TRIE())
 +
 +</code>
 +
 +contryen
 +
 +<code>
 +CREATE DICTIONARY dictionaries.contryen
 +(
 +    `geoname_id` id,
 +    `locale_code` String,
 +    `continent_code` String,
 +    `continent_name` String,
 +    `country_iso_code` String,
 +    `country_name` String,
 +    `is_in_european_union` UInt8
 +)
 +PRIMARY KEY geoname_id
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/coutry-location.csv' FORMAT 'CSVWithNames'))
 +LIFETIME(MIN 0 MAX 3600)
 +LAYOUT(HASHED())
 +
 +</code>
 +
 +contryv4
 +
 +<code>
 +CREATE DICTIONARY dictionaries.contryv4
 +(
 +    `network` String,
 +    `geoname_id` UInt32 DEFAULT 0,
 +    `registered_country_geoname_id` UInt32 DEFAULT 0,
 +    `represented_country_geoname_id` UInt32 DEFAULT 0,
 +    `is_anonymous_proxy` UInt8 DEFAULT 0,
 +    `is_satellite_provider` UInt8 DEFAULT 0
 +)
 +PRIMARY KEY network
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/country-ipv4.csv' FORMAT 'CSVWithNames'))
 +LIFETIME(MIN 0 MAX 3600)
 +LAYOUT(IP_TRIE())
 +
 +</code>
 +
 +contryv6
 +
 +<code>
 +CREATE DICTIONARY dictionaries.contryv6
 +(
 +    `network` String,
 +    `geoname_id` UInt32 DEFAULT 0,
 +    `registered_country_geoname_id` UInt32 DEFAULT 0,
 +    `represented_country_geoname_id` UInt32 DEFAULT 0,
 +    `is_anonymous_proxy` UInt8 DEFAULT 0,
 +    `is_satellite_provider` UInt8 DEFAULT 0
 +)
 +PRIMARY KEY network
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/country-ipv6.csv' FORMAT 'CSVWithNames'))
 +LIFETIME(MIN 0 MAX 3600)
 +LAYOUT(IP_TRIE())
 +
 +</code>
 +
 +switches
 +
 +<code>
 +CREATE DICTIONARY dictionaries.switches
 +(
 +    `ip` String,
 +    `name` String
 +)
 +PRIMARY KEY ip
 +SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/switches.csv' FORMAT 'CSVWithNames'))
 +LIFETIME(MIN 0 MAX 3600)
 +LAYOUT(COMPLEX_KEY_HASHED())
 +
 +</code>
  
  
giganet/sflow.1627898691.txt.gz · Last modified: 2021/08/02 13:04 by rb