This is an old revision of the document!
switch sflow -> goflow2 -> kafka -> clickhouse
примеры конфигов для свитчей
пример настройки goflow2
пример настройки kafka
CREATE TABLE traffic.kafka
(
`TimeReceived` UInt64,
`SequenceNum` UInt64,
`SamplingRate` UInt64,
`FlowDirection` UInt32,
`SamplerAddress` FixedString(16),
`TimeFlowStart` UInt64,
`TimeFlowEnd` UInt64,
`Bytes` UInt64,
`Packets` UInt64,
`SrcAddr` FixedString(16),
`DstAddr` FixedString(16),
`EType` UInt32,
`Proto` UInt32,
`SrcPort` UInt32,
`DstPort` UInt32,
`InIf` UInt32,
`OutIf` UInt32,
`SrcMac` String,
`DstMac` String,
`SrcVlan` UInt32,
`DstVlan` UInt32,
`VlanId` UInt32,
`IngressVrfID` UInt32,
`EgressVrfID` UInt32,
`IPTos` UInt32,
`ForwardingStatus` UInt32,
`IPTTL` UInt32,
`TCPFlags` UInt32,
`IcmpType` UInt32,
`IcmpCode` UInt32,
`IPv6FlowLabel` UInt32,
`FragmentId` UInt32,
`FragmentOffset` UInt32,
`BiFlowDirection` UInt32,
`SrcAs` UInt32,
`DstAs` UInt32,
`NextHop` FixedString(16),
`NextHopAS` UInt32,
`SrcNet` UInt32,
`DstNet` UInt32,
`HasMPLS` UInt8,
`MPLSCount` UInt32,
`MPLS1TTL` UInt32,
`MPLS1Label` UInt32,
`MPLS2TTL` UInt32,
`MPLS2Label` UInt32,
`MPLS3TTL` UInt32,
`MPLS3Label` UInt32,
`MPLSLastTTL` UInt32,
`MPLSLastLabel` UInt32
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = '172.17.172.111:9092',
kafka_topic_list = 'sflow',
kafka_group_name = 'clickhouse',
kafka_format = 'Protobuf',
kafka_schema = 'flow.proto:FlowMessage',
kafka_num_consumers = 6
CREATE TABLE traffic.raw
(
`Date` Date,
`Datetime` DateTime,
`SequenceNum` UInt64,
`SamplingRate` UInt64,
`FlowDirection` UInt32,
`SamplerAddress` String,
`TimeFlowStart` DateTime,
`TimeFlowEnd` DateTime,
`Bytes` UInt64,
`Packets` UInt64,
`SrcIp` String,
`DstIp` String,
`EType` UInt32,
`Proto` String,
`SrcPort` UInt32,
`DstPort` UInt32,
`InIf` UInt32,
`OutIf` UInt32,
`SrcMac` String,
`DstMac` String,
`SrcVlan` UInt32,
`DstVlan` UInt32,
`VlanId` UInt32,
`IngressVrfID` UInt32,
`EgressVrfID` UInt32,
`IPTos` UInt32,
`ForwardingStatus` UInt32,
`IPTTL` UInt32,
`TCPFlags` UInt32,
`IcmpType` UInt32,
`IcmpCode` UInt32,
`IPv6FlowLabel` UInt32,
`FragmentId` UInt32,
`FragmentOffset` UInt32,
`BiFlowDirection` UInt32,
`SrcGeo` UInt32,
`DstGeo` UInt32,
`SrcAs` UInt32,
`DstAs` UInt32,
`SrcCountry` String,
`DstCountry` String,
`NextHop` String,
`NextHopAS` UInt32,
`SrcNet` String,
`DstNet` String,
`HasMPLS` UInt8,
`MPLSCount` UInt32,
`MPLS1TTL` UInt32,
`MPLS1Label` UInt32,
`MPLS2TTL` UInt32,
`MPLS2Label` UInt32,
`MPLS3TTL` UInt32,
`MPLS3Label` UInt32,
`MPLSLastTTL` UInt32,
`MPLSLastLabel` UInt32)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(Date)
ORDER BY Datetime
TTL Date + toIntervalDay(7)
SETTINGS index_granularity = 8192
CREATE MATERIALIZED VIEW traffic.kafka_to_raw TO traffic.raw
(
`Date` Date,
`Datetime` DateTime,
`SequenceNum` UInt64,
`SamplingRate` UInt64,
`FlowDirection` UInt32,
`SamplerAddress` String,
`TimeFlowStart` DateTime,
`TimeFlowEnd` DateTime,
`Bytes` UInt64,
`Packets` UInt64,
`SrcIp` String,
`DstIp` String,
`EType` UInt32,
`Proto` String,
`SrcPort` UInt32,
`DstPort` UInt32,
`InIf` UInt32,
`OutIf` UInt32,
`SrcMac` String,
`DstMac` String,
`SrcVlan` UInt32,
`DstVlan` UInt32,
`VlanId` UInt32,
`IngressVrfID` UInt32,
`EgressVrfID` UInt32,
`IPTos` UInt32,
`ForwardingStatus` UInt32,
`IPTTL` UInt32,
`TCPFlags` UInt32,
`IcmpType` UInt32,
`IcmpCode` UInt32,
`IPv6FlowLabel` UInt32,
`FragmentId` UInt32,
`FragmentOffset` UInt32,
`BiFlowDirection` UInt32,
`SrcAs` UInt32,
`DstAs` UInt32,
`SrcCountry` String,
`DstCountry` String,
`NextHop` String,
`NextHopAS` UInt32,
`SrcNet` String,
`DstNet` String,
`HasMPLS` UInt8,
`MPLSCount` UInt32,
`MPLS1TTL` UInt32,
`MPLS1Label` UInt32,
`MPLS2TTL` UInt32,
`MPLS2Label` UInt32,
`MPLS3TTL` UInt32,
`MPLS3Label` UInt32,
`MPLSLastTTL` UInt32,
`MPLSLastLabel` UInt32
) AS
SELECT
toDate(TimeReceived) AS Date,
TimeReceived as Datetime,
IPv4NumToString(reinterpretAsUInt32(substring(reverse(SamplerAddress), 13, 4))) AS SamplerAddress,
Bytes * SamplingRate as Bytes,
Packets * SamplingRate as Packets,
multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))),
EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(SrcAddr)), 0) AS SrcAs,
multiIf(EType = 0x0800, dictGetUInt32('dictionaries.asnv4', 'asn', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))),
EType = 0x86DD, dictGetUInt32('dictionaries.asnv6', 'asn', tuple(DstAddr)), 0) AS DstAs,
multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4)))),
EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(SrcAddr)), 0) AS SrcGeo,
multiIf(EType = 0x0800, dictGetUInt32('dictionaries.contryv4', 'geoname_id', tuple(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4)))),
EType = 0x86DD, dictGetUInt32('dictionaries.contryv6', 'geoname_id', tuple(DstAddr)), 0) AS DstGeo,
multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4))),
EType = 0x86DD, IPv6NumToString(SrcAddr), '') AS SrcIp,
multiIf(EType = 0x0800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4))),
EType = 0x86DD, IPv6NumToString(DstAddr), '') AS DstIp,
dictGet('dictionaries.protocols', 'name', toUInt64(Proto)) AS Proto,
MACNumToString(toUInt64(SrcMac)) AS SrcMac,
MACNumToString(toUInt64(DstMac)) AS DstMac,
dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(SrcGeo)) AS SrcCountry,
dictGetString('dictionaries.contryen', 'country_iso_code', toUInt64(DstGeo)) AS DstCountry,
SequenceNum,
SamplingRate,
FlowDirection,
TimeFlowStart,
TimeFlowEnd,
EType,
SrcPort,
DstPort,
InIf,
OutIf,
SrcVlan,
DstVlan,
VlanId,
IngressVrfID,
EgressVrfID,
IPTos,
ForwardingStatus,
IPTTL,
TCPFlags,
IcmpType,
IcmpCode,
IPv6FlowLabel,
FragmentId,
FragmentOffset,
BiFlowDirection,
NextHop,
NextHopAS,
SrcNet,
DstNet,
HasMPLS,
MPLSCount,
MPLS1TTL,
MPLS1Label,
MPLS2TTL,
MPLS2Label,
MPLS3TTL,
MPLS3Label,
MPLSLastTTL,
MPLSLastLabel
FROM traffic.kafka