User Tools

Site Tools


giganet:sflow

This is an old revision of the document!


Схема мониторинга sflow статистики

Задействованные сервера

  • goflow - сервер на котором крутится kafka и goflow2 приложение по сбору статистики
  • clickhouse - сервер с базой данных clickhouse, хранение и обработка статистики

Задействованные приложения

  • goflow2
  • kafka
  • clickhouse

Общая схема мониторинга

switch sflow -> goflow2 -> kafka -> clickhouse
  1. switch отправляет данные sflow на UDP порт 6343
  2. goflow2 принимает поток данных, разбирает данные и сформировав protobuf отправляет в kafka
  3. kafka просто получает данные, данные в ней хранятся на протяжении 12 часов
  4. clickhouse
    1. kafka - таблица подключена на чтение в kafka, данные читаются только 1 раз
    2. kafka_to_raw - представление которое выбирает данные с kafka и добавляя нужные поля вставляет в raw
    3. raw - таблица с остаточными сырыми данными

switch

примеры конфигов для свитчей

goflow2

пример настройки goflow2

kafka

пример настройки kafka

clickhouse

таблица kafka

CREATE TABLE traffic.kafka
(
    `TimeReceived` UInt64,
    `SequenceNum` UInt64,
    `SamplingRate` UInt64,
    `FlowDirection` UInt32,

    `SamplerAddress` FixedString(16),

    `TimeFlowStart` UInt64,
    `TimeFlowEnd` UInt64,

    `Bytes` UInt64,
    `Packets` UInt64,

    `SrcAddr` FixedString(16),
    `DstAddr` FixedString(16),

    `EType` UInt32,
    `Proto` UInt32,

    `SrcPort` UInt32,
    `DstPort` UInt32,

    `InIf` UInt32,
    `OutIf` UInt32,

    `SrcMac` String,
    `DstMac` String,

    `SrcVlan` UInt32,
    `DstVlan` UInt32,
    `VlanId` UInt32,

    `IngressVrfID` UInt32,
    `EgressVrfID` UInt32,

    `IPTos` UInt32,
    `ForwardingStatus` UInt32,
    `IPTTL` UInt32,
    `TCPFlags` UInt32,
    `IcmpType` UInt32,
    `IcmpCode` UInt32,
    `IPv6FlowLabel` UInt32,

    `FragmentId` UInt32,
    `FragmentOffset` UInt32,
    `BiFlowDirection` UInt32,

    `SrcAs` UInt32,
    `DstAs` UInt32,

    `NextHop` FixedString(16),
    `NextHopAS` UInt32,

    `SrcNet` UInt32,
    `DstNet` UInt32,

    `HasMPLS` UInt8,
    `MPLSCount` UInt32,
    `MPLS1TTL` UInt32,
    `MPLS1Label` UInt32,
    `MPLS2TTL` UInt32,
    `MPLS2Label` UInt32,
    `MPLS3TTL` UInt32,
    `MPLS3Label` UInt32,
    `MPLSLastTTL` UInt32,
    `MPLSLastLabel` UInt32
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = '172.17.172.111:9092',
    kafka_topic_list = 'sflow',
    kafka_group_name = 'clickhouse',
    kafka_format = 'Protobuf',
    kafka_schema = 'flow.proto:FlowMessage',
    kafka_num_consumers = 6

таблица raw

CREATE TABLE traffic.raw
(
    `Date` Date,
    `Datetime` DateTime,

    `SequenceNum` UInt64,
    `SamplingRate` UInt64,
    `FlowDirection` UInt32,

    `SamplerAddress` String,

    `TimeFlowStart` DateTime,
    `TimeFlowEnd` DateTime,

    `Bytes` UInt64,
    `Packets` UInt64,

    `SrcIp` String,
    `DstIp` String,

    `EType` UInt32,
    `Proto` String,

    `SrcPort` UInt32,
    `DstPort` UInt32,

    `InIf` UInt32,
    `OutIf` UInt32,

    `SrcMac` String,
    `DstMac` String,

    `SrcVlan` UInt32,
    `DstVlan` UInt32,
    `VlanId` UInt32,

    `IngressVrfID` UInt32,
    `EgressVrfID` UInt32,

    `IPTos` UInt32,
    `ForwardingStatus` UInt32,
    `IPTTL` UInt32,
    `TCPFlags` UInt32,
    `IcmpType` UInt32,
    `IcmpCode` UInt32,
    `IPv6FlowLabel` UInt32,

    `FragmentId` UInt32,
    `FragmentOffset` UInt32,
    `BiFlowDirection` UInt32,

    `SrcGeo` UInt32,
    `DstGeo` UInt32,
    `SrcAs` UInt32,
    `DstAs` UInt32,

    `SrcCountry` String,
    `DstCountry` String,

    `NextHop` String,
    `NextHopAS` UInt32,

    `SrcNet` String,
    `DstNet` String,

    `HasMPLS` UInt8,
    `MPLSCount` UInt32,
    `MPLS1TTL` UInt32,
    `MPLS1Label` UInt32,
    `MPLS2TTL` UInt32,
    `MPLS2Label` UInt32,
    `MPLS3TTL` UInt32,
    `MPLS3Label` UInt32,
    `MPLSLastTTL` UInt32,
    `MPLSLastLabel` UInt32)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(Date)
ORDER BY Datetime
TTL Date + toIntervalDay(7)
SETTINGS index_granularity = 8192

таблица kafka_to_raw

CREATE MATERIALIZED VIEW traffic.kafka_to_raw TO traffic.raw
(
    `Date` Date,
    `Datetime` DateTime,

    `SequenceNum` UInt64,
    `SamplingRate` UInt64,
    `FlowDirection` UInt32,

    `SamplerAddress` String,

    `TimeFlowStart` DateTime,
    `TimeFlowEnd` DateTime,

    `Bytes` UInt64,
    `Packets` UInt64,

    `SrcIp` String,
    `DstIp` String,

    `EType` UInt32,
    `Proto` String,

    `SrcPort` UInt32,
    `DstPort` UInt32,

    `InIf` UInt32,
    `OutIf` UInt32,

    `SrcMac` String,
    `DstMac` String,

    `SrcVlan` UInt32,
    `DstVlan` UInt32,
    `VlanId` UInt32,

    `IngressVrfID` UInt32,
    `EgressVrfID` UInt32,

    `IPTos` UInt32,
    `ForwardingStatus` UInt32,
    `IPTTL` UInt32,
    `TCPFlags` UInt32,
    `IcmpType` UInt32,
    `IcmpCode` UInt32,
    `IPv6FlowLabel` UInt32,

    `FragmentId` UInt32,
    `FragmentOffset` UInt32,
    `BiFlowDirection` UInt32,

    `SrcAs` UInt32,
    `DstAs` UInt32,

    `SrcCountry` String,
    `DstCountry` String,

    `NextHop` String,
    `NextHopAS` UInt32,

    `SrcNet` String,
    `DstNet` String,

    `HasMPLS` UInt8,
    `MPLSCount` UInt32,
    `MPLS1TTL` UInt32,
    `MPLS1Label` UInt32,
    `MPLS2TTL` UInt32,
    `MPLS2Label` UInt32,
    `MPLS3TTL` UInt32,
    `MPLS3Label` UInt32,
    `MPLSLastTTL` UInt32,
    `MPLSLastLabel` UInt32
) AS
SELECT
    toDate(TimeReceived) AS Date,
    TimeReceived as Datetime,
    IPv4NumToString(reinterpretAsUInt32(substring(reverse(SamplerAddress), 13, 4))) AS SamplerAddress,
    Bytes * SamplingRate as Bytes,
    Packets * SamplingRate as Packets,
    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))),
            EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(SrcAddr)), 0) AS SrcAs,
    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))),
            EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(DstAddr)), 0) AS DstAs,
    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))),
            EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(SrcAddr)), 0) AS SrcGeo,
    multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))),
            EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(DstAddr)), 0) AS DstGeo,
    multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4))),
            EType = 0x86DD, IPv6NumToString(SrcAddr), '') AS SrcIp,
    multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4))),
            EType = 0x86DD, IPv6NumToString(DstAddr), '') AS DstIp,
    dictGet('dictionaries.protocols', 'name', toUInt64(Proto)) AS Proto,
    MACNumToString(toUInt64(SrcMac)) AS SrcMac,
    MACNumToString(toUInt64(DstMac)) AS DstMac,
    dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(SrcGeo)) AS SrcCountry,
    dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(DstGeo)) AS DstCountry,

    SequenceNum,
    SamplingRate,
    FlowDirection,
    TimeFlowStart,
    TimeFlowEnd,
    EType,
    SrcPort,
    DstPort,
    InIf,
    OutIf,
    SrcVlan,
    DstVlan,
    VlanId,
    IngressVrfID,
    EgressVrfID,
    IPTos,
    ForwardingStatus,
    IPTTL,
    TCPFlags,
    IcmpType,
    IcmpCode,
    IPv6FlowLabel,
    FragmentId,
    FragmentOffset,
    BiFlowDirection,
    NextHop,
    NextHopAS,
    SrcNet,
    DstNet,
    HasMPLS,
    MPLSCount,
    MPLS1TTL,
    MPLS1Label,
    MPLS2TTL,
    MPLS2Label,
    MPLS3TTL,
    MPLS3Label,
    MPLSLastTTL,
    MPLSLastLabel
FROM traffic.kafka
giganet/sflow.1627899022.txt.gz · Last modified: 2021/08/02 13:10 by rb