Golang 日志管理项目:采集、解析和存储
{ "package": "main", "import": [{ "fmt", "log", "os", "os/signal", "syscall", "time" ], [ "github.com/Shopify/sarama", "github.com/hpcloud/tail" ] }, // LogCollector 定义日志收集器接口 type LogCollector interface { Collect() <-chan string } // KafkaCollector 实现 Kafka 日志收集器 type KafkaCollector struct { topic string partition int32 brokers []string } // Collect 从 Kafka 中收集日志消息 func (kc *KafkaCollector) Collect() <-chan string { msgChan := make(chan string) consumer, err := sarama.NewConsumer(kc.brokers, nil) if err != nil { log.Fatal(err) } partitionConsumer, err := consumer.ConsumePartition(kc.topic, kc.partition, sarama.OffsetNewest) if err != nil { log.Fatal(err) } go func() { defer consumer.Close() defer partitionConsumer.Close() for message := range partitionConsumer.Messages() { msgChan <- string(message.Value) } }() return msgChan } // SyslogCollector 实现 Syslog 日志收集器 type SyslogCollector struct { filePath string } // Collect 从 Syslog 文件中收集日志消息 func (sc *SyslogCollector) Collect() <-chan string { msgChan := make(chan string) go func() { t, err := tail.TailFile(sc.filePath, tail.Config{Follow: true}) if err != nil { log.Fatal(err) } for line := range t.Lines { msgChan <- line.Text } }() return msgChan } // LogParser 定义日志解析器接口 type LogParser interface { Parse(string) (map[string]interface{}, error) } // CustomLogParser 实现自定义日志解析器 type CustomLogParser struct { // 实现自定义解析逻辑 } // Parse 解析日志消息 func (clp *CustomLogParser) Parse(msg string) (map[string]interface{}, error) { // 解析逻辑 return nil, nil } // LogStorage 定义日志存储接口 type LogStorage interface { Store(map[string]interface{}) error } // ConsoleStorage 实现控制台日志存储 type ConsoleStorage struct { // 实现控制台存储逻辑 } // Store 存储日志消息到控制台 func (cs *ConsoleStorage) Store(data map[string]interface{}) error { // 存储逻辑 return nil } // FileStorage 实现文件日志存储 type FileStorage struct { filePath string } // Store 存储日志消息到文件 func (fs *FileStorage) Store(data map[string]interface{}) error { // 存储逻辑 return nil } func main() { // 创建日志收集器 kafkaCollector := &KafkaCollector{ topic: "logs", partition: 0, brokers: []string{"localhost:9092"} } syslogCollector := &SyslogCollector{ filePath: "/var/log/syslog" } // 创建日志解析器 customLogParser := &CustomLogParser{} // 创建日志存储器 consoleStorage := &ConsoleStorage{} fileStorage := &FileStorage{ filePath: "/var/log/myapp.log" } // 创建信号处理器,用于优雅地关闭程序 exitChan := make(chan os.Signal, 1) signal.Notify(exitChan, syscall.SIGINT, syscall.SIGTERM) // 启动日志处理协程 go func() { logCollector := LogCollector(kafkaCollector) // 使用 Kafka 收集日志 // logCollector := LogCollector(syslogCollector) // 使用 Syslog 收集日志 logParser := LogParser(customLogParser) logStorage := LogStorage(consoleStorage) // 使用控制台存储日志 // logStorage := LogStorage(fileStorage) // 使用文件存储日志 for { select { case msg := <-logCollector.Collect(): data, err := logParser.Parse(msg) if err != nil { fmt.Println("Failed to parse log:", err) continue } err = logStorage.Store(data) if err != nil { fmt.Println("Failed to store log:", err) } case <-exitChan: return } } }() // 优雅地关闭程序 <-exitChan fmt.Println("Exiting...") time.Sleep(2 * time.Second) }
原文地址: https://www.cveoy.top/t/topic/pFxQ 著作权归作者所有。请勿转载和采集!