User Tools

Site Tools


giganet:sflow

This is an old revision of the document!


Схема мониторинга sflow статистики

Задействованные сервера

  • goflow - сервер на котором крутится kafka и goflow2 приложение по сбору статистики
  • clickhouse - сервер с базой данных clickhouse, хранение и обработка статистики

Задействованные приложения

  • goflow2
  • kafka
  • clickhouse

Общая схема мониторинга

switch sflow -> goflow2 -> kafka -> clickhouse
  1. switch отправляет данные sflow на UDP порт 6343
  2. goflow2 принимает поток данных, разбирает данные и сформировав protobuf отправляет в kafka
  3. kafka просто получает данные, данные в ней хранятся на протяжении 12 часов
  4. clickhouse
    1. kafka - таблица подключена на чтение в kafka, данные читаются только 1 раз
    2. kafka_to_raw - представление которое выбирает данные с kafka и добавляя нужные поля вставляет в raw
    3. raw - таблица с остаточными сырыми данными

switch

примеры конфигов для свитчей

goflow2

file /etc/default/goflow2

GOFLOW2_ARGS=-format pb -format.protobuf.fixedlen=true -listen=sflow://172.17.172.111:6343 -reuseport=true -transport=kafka -transport.kafka.brokers=172.17.172.111:9092 -transport.kafka.hashing=true -transport.kafka.log.err=true -transport.kafka.topic=sflow -reuseport=true -workers=2

kafka

/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list
/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe "test-1"
/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test-1
/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --config compression.type=producer --config cleanup.policy=delete --topic sflow --partitions 10 --replication-factor 1

clickhouse

дополнительная информация

таблица kafka

CREATE TABLE traffic.kafka
(
    `TimeReceived` UInt64,
    `SequenceNum` UInt64,
    `SamplingRate` UInt64,
    `FlowDirection` UInt32,

    `SamplerAddress` FixedString(16),

    `TimeFlowStart` UInt64,
    `TimeFlowEnd` UInt64,

    `Bytes` UInt64,
    `Packets` UInt64,

    `SrcAddr` FixedString(16),
    `DstAddr` FixedString(16),

    `EType` UInt32,
    `Proto` UInt32,

    `SrcPort` UInt32,
    `DstPort` UInt32,

    `InIf` UInt32,
    `OutIf` UInt32,

    `SrcMac` String,
    `DstMac` String,

    `SrcVlan` UInt32,
    `DstVlan` UInt32,
    `VlanId` UInt32,

    `IngressVrfID` UInt32,
    `EgressVrfID` UInt32,

    `IPTos` UInt32,
    `ForwardingStatus` UInt32,
    `IPTTL` UInt32,
    `TCPFlags` UInt32,
    `IcmpType` UInt32,
    `IcmpCode` UInt32,
    `IPv6FlowLabel` UInt32,

    `FragmentId` UInt32,
    `FragmentOffset` UInt32,
    `BiFlowDirection` UInt32,

    `SrcAs` UInt32,
    `DstAs` UInt32,

    `NextHop` FixedString(16),
    `NextHopAS` UInt32,

    `SrcNet` UInt32,
    `DstNet` UInt32,

    `HasMPLS` UInt8,
    `MPLSCount` UInt32,
    `MPLS1TTL` UInt32,
    `MPLS1Label` UInt32,
    `MPLS2TTL` UInt32,
    `MPLS2Label` UInt32,
    `MPLS3TTL` UInt32,
    `MPLS3Label` UInt32,
    `MPLSLastTTL` UInt32,
    `MPLSLastLabel` UInt32
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = '172.17.172.111:9092',
    kafka_topic_list = 'sflow',
    kafka_group_name = 'clickhouse',
    kafka_format = 'Protobuf',
    kafka_schema = 'flow.proto:FlowMessage',
    kafka_num_consumers = 6

таблица raw

CREATE TABLE traffic.raw
(
    `Date` Date,
    `Datetime` DateTime,

    `SequenceNum` UInt64,
    `SamplingRate` UInt64,
    `FlowDirection` UInt32,

    `SamplerAddress` String,

    `TimeFlowStart` DateTime,
    `TimeFlowEnd` DateTime,

    `Bytes` UInt64,
    `Packets` UInt64,

    `SrcIp` String,
    `DstIp` String,

    `EType` UInt32,
    `Proto` String,

    `SrcPort` UInt32,
    `DstPort` UInt32,

    `InIf` UInt32,
    `OutIf` UInt32,

    `SrcMac` String,
    `DstMac` String,

    `SrcVlan` UInt32,
    `DstVlan` UInt32,
    `VlanId` UInt32,

    `IngressVrfID` UInt32,
    `EgressVrfID` UInt32,

    `IPTos` UInt32,
    `ForwardingStatus` UInt32,
    `IPTTL` UInt32,
    `TCPFlags` UInt32,
    `IcmpType` UInt32,
    `IcmpCode` UInt32,
    `IPv6FlowLabel` UInt32,

    `FragmentId` UInt32,
    `FragmentOffset` UInt32,
    `BiFlowDirection` UInt32,

    `SrcGeo` UInt32,
    `DstGeo` UInt32,
    `SrcAs` UInt32,
    `DstAs` UInt32,

    `SrcCountry` String,
    `DstCountry` String,

    `NextHop` String,
    `NextHopAS` UInt32,

    `SrcNet` String,
    `DstNet` String,

    `HasMPLS` UInt8,
    `MPLSCount` UInt32,
    `MPLS1TTL` UInt32,
    `MPLS1Label` UInt32,
    `MPLS2TTL` UInt32,
    `MPLS2Label` UInt32,
    `MPLS3TTL` UInt32,
    `MPLS3Label` UInt32,
    `MPLSLastTTL` UInt32,
    `MPLSLastLabel` UInt32)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(Date)
ORDER BY Datetime
TTL Date + toIntervalDay(7)
SETTINGS index_granularity = 8192

таблица kafka_to_raw

CREATE MATERIALIZED VIEW traffic.kafka_to_raw TO traffic.raw
(
    `Date` Date,
    `Datetime` DateTime,

    `SequenceNum` UInt64,
    `SamplingRate` UInt64,
    `FlowDirection` UInt32,

    `SamplerAddress` String,

    `TimeFlowStart` DateTime,
    `TimeFlowEnd` DateTime,

    `Bytes` UInt64,
    `Packets` UInt64,

    `SrcIp` String,
    `DstIp` String,

    `EType` UInt32,
    `Proto` String,

    `SrcPort` UInt32,
    `DstPort` UInt32,

    `InIf` UInt32,
    `OutIf` UInt32,

    `SrcMac` String,
    `DstMac` String,

    `SrcVlan` UInt32,
    `DstVlan` UInt32,
    `VlanId` UInt32,

    `IngressVrfID` UInt32,
    `EgressVrfID` UInt32,

    `IPTos` UInt32,
    `ForwardingStatus` UInt32,
    `IPTTL` UInt32,
    `TCPFlags` UInt32,
    `IcmpType` UInt32,
    `IcmpCode` UInt32,
    `IPv6FlowLabel` UInt32,

    `FragmentId` UInt32,
    `FragmentOffset` UInt32,
    `BiFlowDirection` UInt32,

    `SrcAs` UInt32,
    `DstAs` UInt32,

    `SrcCountry` String,
    `DstCountry` String,

    `NextHop` String,
    `NextHopAS` UInt32,

    `SrcNet` String,
    `DstNet` String,

    `HasMPLS` UInt8,
    `MPLSCount` UInt32,
    `MPLS1TTL` UInt32,
    `MPLS1Label` UInt32,
    `MPLS2TTL` UInt32,
    `MPLS2Label` UInt32,
    `MPLS3TTL` UInt32,
    `MPLS3Label` UInt32,
    `MPLSLastTTL` UInt32,
    `MPLSLastLabel` UInt32
) AS
SELECT
    toDate(TimeReceived) AS Date,
    TimeReceived as Datetime,
    IPv4NumToString(reinterpretAsUInt32(substring(reverse(SamplerAddress), 13, 4))) AS SamplerAddress,
    Bytes * SamplingRate as Bytes,
    Packets * SamplingRate as Packets,
    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))),
            EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(SrcAddr)), 0) AS SrcAs,
    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))),
            EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(DstAddr)), 0) AS DstAs,
    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))),
            EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(SrcAddr)), 0) AS SrcGeo,
    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))),
            EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(DstAddr)), 0) AS DstGeo,
    multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4))),
            EType = 0x86DD, IPv6NumToString(SrcAddr), '') AS SrcIp,
    multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4))),
            EType = 0x86DD, IPv6NumToString(DstAddr), '') AS DstIp,
    dictGet('dictionaries.protocols', 'name', toUInt64(Proto)) AS Proto,
    MACNumToString(toUInt64(SrcMac)) AS SrcMac,
    MACNumToString(toUInt64(DstMac)) AS DstMac,
    dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(SrcGeo)) AS SrcCountry,
    dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(DstGeo)) AS DstCountry,

    SequenceNum,
    SamplingRate,
    FlowDirection,
    TimeFlowStart,
    TimeFlowEnd,
    EType,
    SrcPort,
    DstPort,
    InIf,
    OutIf,
    SrcVlan,
    DstVlan,
    VlanId,
    IngressVrfID,
    EgressVrfID,
    IPTos,
    ForwardingStatus,
    IPTTL,
    TCPFlags,
    IcmpType,
    IcmpCode,
    IPv6FlowLabel,
    FragmentId,
    FragmentOffset,
    BiFlowDirection,
    NextHop,
    NextHopAS,
    SrcNet,
    DstNet,
    HasMPLS,
    MPLSCount,
    MPLS1TTL,
    MPLS1Label,
    MPLS2TTL,
    MPLS2Label,
    MPLS3TTL,
    MPLS3Label,
    MPLSLastTTL,
    MPLSLastLabel
FROM traffic.kafka

Словари

protocols - словарь для текстового представления протоколов

CREATE DICTIONARY dictionaries.protocols (
    `proto` UInt8,
    `name` String,
    `description` String
)
PRIMARY KEY proto
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/protocols.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(FLAT())

asnv4 - конвертация ipv4 к ASN

CREATE DICTIONARY dictionaries.asnv4
(
    `network` String,
    `asn` UInt32,
    `aso` String DEFAULT '??'
)
PRIMARY KEY network
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/ip-asn-ipv4.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(IP_TRIE())

asnv6 - конвертация ipv6 к ASN

CREATE DICTIONARY dictionaries.asnv6
(
    `network` String,
    `asn` UInt32,
    `aso` String DEFAULT '??'
)
PRIMARY KEY network
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/ip-asn-ipv6.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(IP_TRIE())

contryen

CREATE DICTIONARY dictionaries.contryen
(
    `geoname_id` id,
    `locale_code` String,
    `continent_code` String,
    `continent_name` String,
    `country_iso_code` String,
    `country_name` String,
    `is_in_european_union` UInt8
)
PRIMARY KEY geoname_id
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/coutry-location.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(HASHED())

contryv4

CREATE DICTIONARY dictionaries.contryv4
(
    `network` String,
    `geoname_id` UInt32 DEFAULT 0,
    `registered_country_geoname_id` UInt32 DEFAULT 0,
    `represented_country_geoname_id` UInt32 DEFAULT 0,
    `is_anonymous_proxy` UInt8 DEFAULT 0,
    `is_satellite_provider` UInt8 DEFAULT 0
)
PRIMARY KEY network
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/country-ipv4.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(IP_TRIE())

contryv6

CREATE DICTIONARY dictionaries.contryv6
(
    `network` String,
    `geoname_id` UInt32 DEFAULT 0,
    `registered_country_geoname_id` UInt32 DEFAULT 0,
    `represented_country_geoname_id` UInt32 DEFAULT 0,
    `is_anonymous_proxy` UInt8 DEFAULT 0,
    `is_satellite_provider` UInt8 DEFAULT 0
)
PRIMARY KEY network
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/country-ipv6.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(IP_TRIE())

switches

CREATE DICTIONARY dictionaries.switches
(
    `ip` String,
    `name` String
)
PRIMARY KEY ip
SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/switches.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(COMPLEX_KEY_HASHED())
giganet/sflow.1627900497.txt.gz · Last modified: 2021/08/02 13:34 by rb