This is an old revision of the document!
switch sflow -> goflow2 -> kafka -> clickhouse
примеры конфигов для свитчей
пример настройки goflow2
пример настройки kafka
CREATE TABLE traffic.kafka ( `TimeReceived` UInt64, `SequenceNum` UInt64, `SamplingRate` UInt64, `FlowDirection` UInt32, `SamplerAddress` FixedString(16), `TimeFlowStart` UInt64, `TimeFlowEnd` UInt64, `Bytes` UInt64, `Packets` UInt64, `SrcAddr` FixedString(16), `DstAddr` FixedString(16), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt32, `DstPort` UInt32, `InIf` UInt32, `OutIf` UInt32, `SrcMac` String, `DstMac` String, `SrcVlan` UInt32, `DstVlan` UInt32, `VlanId` UInt32, `IngressVrfID` UInt32, `EgressVrfID` UInt32, `IPTos` UInt32, `ForwardingStatus` UInt32, `IPTTL` UInt32, `TCPFlags` UInt32, `IcmpType` UInt32, `IcmpCode` UInt32, `IPv6FlowLabel` UInt32, `FragmentId` UInt32, `FragmentOffset` UInt32, `BiFlowDirection` UInt32, `SrcAs` UInt32, `DstAs` UInt32, `NextHop` FixedString(16), `NextHopAS` UInt32, `SrcNet` UInt32, `DstNet` UInt32, `HasMPLS` UInt8, `MPLSCount` UInt32, `MPLS1TTL` UInt32, `MPLS1Label` UInt32, `MPLS2TTL` UInt32, `MPLS2Label` UInt32, `MPLS3TTL` UInt32, `MPLS3Label` UInt32, `MPLSLastTTL` UInt32, `MPLSLastLabel` UInt32 ) ENGINE = Kafka SETTINGS kafka_broker_list = '172.17.172.111:9092', kafka_topic_list = 'sflow', kafka_group_name = 'clickhouse', kafka_format = 'Protobuf', kafka_schema = 'flow.proto:FlowMessage', kafka_num_consumers = 6
CREATE TABLE traffic.raw ( `Date` Date, `Datetime` DateTime, `SequenceNum` UInt64, `SamplingRate` UInt64, `FlowDirection` UInt32, `SamplerAddress` String, `TimeFlowStart` DateTime, `TimeFlowEnd` DateTime, `Bytes` UInt64, `Packets` UInt64, `SrcIp` String, `DstIp` String, `EType` UInt32, `Proto` String, `SrcPort` UInt32, `DstPort` UInt32, `InIf` UInt32, `OutIf` UInt32, `SrcMac` String, `DstMac` String, `SrcVlan` UInt32, `DstVlan` UInt32, `VlanId` UInt32, `IngressVrfID` UInt32, `EgressVrfID` UInt32, `IPTos` UInt32, `ForwardingStatus` UInt32, `IPTTL` UInt32, `TCPFlags` UInt32, `IcmpType` UInt32, `IcmpCode` UInt32, `IPv6FlowLabel` UInt32, `FragmentId` UInt32, `FragmentOffset` UInt32, `BiFlowDirection` UInt32, `SrcGeo` UInt32, `DstGeo` UInt32, `SrcAs` UInt32, `DstAs` UInt32, `SrcCountry` String, `DstCountry` String, `NextHop` String, `NextHopAS` UInt32, `SrcNet` String, `DstNet` String, `HasMPLS` UInt8, `MPLSCount` UInt32, `MPLS1TTL` UInt32, `MPLS1Label` UInt32, `MPLS2TTL` UInt32, `MPLS2Label` UInt32, `MPLS3TTL` UInt32, `MPLS3Label` UInt32, `MPLSLastTTL` UInt32, `MPLSLastLabel` UInt32) ENGINE = MergeTree PARTITION BY toYYYYMMDD(Date) ORDER BY Datetime TTL Date + toIntervalDay(7) SETTINGS index_granularity = 8192
CREATE MATERIALIZED VIEW traffic.kafka_to_raw TO traffic.raw ( `Date` Date, `Datetime` DateTime, `SequenceNum` UInt64, `SamplingRate` UInt64, `FlowDirection` UInt32, `SamplerAddress` String, `TimeFlowStart` DateTime, `TimeFlowEnd` DateTime, `Bytes` UInt64, `Packets` UInt64, `SrcIp` String, `DstIp` String, `EType` UInt32, `Proto` String, `SrcPort` UInt32, `DstPort` UInt32, `InIf` UInt32, `OutIf` UInt32, `SrcMac` String, `DstMac` String, `SrcVlan` UInt32, `DstVlan` UInt32, `VlanId` UInt32, `IngressVrfID` UInt32, `EgressVrfID` UInt32, `IPTos` UInt32, `ForwardingStatus` UInt32, `IPTTL` UInt32, `TCPFlags` UInt32, `IcmpType` UInt32, `IcmpCode` UInt32, `IPv6FlowLabel` UInt32, `FragmentId` UInt32, `FragmentOffset` UInt32, `BiFlowDirection` UInt32, `SrcAs` UInt32, `DstAs` UInt32, `SrcCountry` String, `DstCountry` String, `NextHop` String, `NextHopAS` UInt32, `SrcNet` String, `DstNet` String, `HasMPLS` UInt8, `MPLSCount` UInt32, `MPLS1TTL` UInt32, `MPLS1Label` UInt32, `MPLS2TTL` UInt32, `MPLS2Label` UInt32, `MPLS3TTL` UInt32, `MPLS3Label` UInt32, `MPLSLastTTL` UInt32, `MPLSLastLabel` UInt32 ) AS SELECT toDate(TimeReceived) AS Date, TimeReceived as Datetime, IPv4NumToString(reinterpretAsUInt32(substring(reverse(SamplerAddress), 13, 4))) AS SamplerAddress, Bytes * SamplingRate as Bytes, Packets * SamplingRate as Packets, multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))), EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(SrcAddr)), 0) AS SrcAs, multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))), EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(DstAddr)), 0) AS DstAs, multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))), EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(SrcAddr)), 0) AS SrcGeo, multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))), EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(DstAddr)), 0) AS DstGeo, multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4))), EType = 0x86DD, IPv6NumToString(SrcAddr), '') AS SrcIp, multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4))), EType = 0x86DD, IPv6NumToString(DstAddr), '') AS DstIp, dictGet('dictionaries.protocols', 'name', toUInt64(Proto)) AS Proto, MACNumToString(toUInt64(SrcMac)) AS SrcMac, MACNumToString(toUInt64(DstMac)) AS DstMac, dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(SrcGeo)) AS SrcCountry, dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(DstGeo)) AS DstCountry, SequenceNum, SamplingRate, FlowDirection, TimeFlowStart, TimeFlowEnd, EType, SrcPort, DstPort, InIf, OutIf, SrcVlan, DstVlan, VlanId, IngressVrfID, EgressVrfID, IPTos, ForwardingStatus, IPTTL, TCPFlags, IcmpType, IcmpCode, IPv6FlowLabel, FragmentId, FragmentOffset, BiFlowDirection, NextHop, NextHopAS, SrcNet, DstNet, HasMPLS, MPLSCount, MPLS1TTL, MPLS1Label, MPLS2TTL, MPLS2Label, MPLS3TTL, MPLS3Label, MPLSLastTTL, MPLSLastLabel FROM traffic.kafka
protocols - словарь для текстового представления протоколов
CREATE DICTIONARY dictionaries.protocols ( `proto` UInt8, `name` String, `description` String ) PRIMARY KEY proto SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/protocols.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(FLAT())
asnv4 - конвертация ipv4 к ASN
CREATE DICTIONARY dictionaries.asnv4 ( `network` String, `asn` UInt32, `aso` String DEFAULT '??' ) PRIMARY KEY network SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/ip-asn-ipv4.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(IP_TRIE())
asnv6 - конвертация ipv6 к ASN
CREATE DICTIONARY dictionaries.asnv6 ( `network` String, `asn` UInt32, `aso` String DEFAULT '??' ) PRIMARY KEY network SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/ip-asn-ipv6.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(IP_TRIE())
contryen
CREATE DICTIONARY dictionaries.contryen ( `geoname_id` id, `locale_code` String, `continent_code` String, `continent_name` String, `country_iso_code` String, `country_name` String, `is_in_european_union` UInt8 ) PRIMARY KEY geoname_id SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/coutry-location.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(HASHED())
contryv4
CREATE DICTIONARY dictionaries.contryv4 ( `network` String, `geoname_id` UInt32 DEFAULT 0, `registered_country_geoname_id` UInt32 DEFAULT 0, `represented_country_geoname_id` UInt32 DEFAULT 0, `is_anonymous_proxy` UInt8 DEFAULT 0, `is_satellite_provider` UInt8 DEFAULT 0 ) PRIMARY KEY network SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/country-ipv4.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(IP_TRIE())
contryv6
CREATE DICTIONARY dictionaries.contryv6 ( `network` String, `geoname_id` UInt32 DEFAULT 0, `registered_country_geoname_id` UInt32 DEFAULT 0, `represented_country_geoname_id` UInt32 DEFAULT 0, `is_anonymous_proxy` UInt8 DEFAULT 0, `is_satellite_provider` UInt8 DEFAULT 0 ) PRIMARY KEY network SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/country-ipv6.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(IP_TRIE())
switches
CREATE DICTIONARY dictionaries.switches ( `ip` String, `name` String ) PRIMARY KEY ip SOURCE(FILE(PATH '/var/lib/clickhouse/user_files/switches.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(COMPLEX_KEY_HASHED())