diff --git a/include/spdlog/sinks/kafka_skin.h b/include/spdlog/sinks/kafka_skin.h new file mode 100644 index 00000000..12d62286 --- /dev/null +++ b/include/spdlog/sinks/kafka_skin.h @@ -0,0 +1,133 @@ +// Copyright(c) 2015-present, Gabi Melman & spdlog contributors. +// Distributed under the MIT License (http://opensource.org/licenses/MIT) + +#pragma once + +// +// Custom sink for kafka +// Building and using requires librdkafka library. +// For building librdkafka library check the url below +// https://github.com/confluentinc/librdkafka +// + +#include +#include "spdlog/details/log_msg.h" +#include "spdlog/sinks/base_sink.h" +#include "spdlog/details/synchronous_factory.h" +#include "spdlog/details/null_mutex.h" +#include "spdlog/async.h" +#include + +// kafka header +#include + +namespace spdlog { +namespace sinks { + +struct kafka_sink_config +{ + std::string server_addr; + std::string produce_topic; + int32_t flush_timeout_ms = 1000; + + kafka_sink_config(std::string addr, std::string topic, int flush_timeout_ms = 1000) + : server_addr{std::move(addr)} + ,produce_topic{std::move(topic)} + ,flush_timeout_ms(flush_timeout_ms) + {} +}; + +template +class kafka_skin : public base_sink +{ +public: + kafka_skin(kafka_sink_config config) + : config_{std::move(config)} + { + try + { + std::string errstr; + conf_.reset(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); + RdKafka::Conf::ConfResult confRes = conf_->set("bootstrap.servers", config_.server_addr, errstr); + if (confRes != RdKafka::Conf::CONF_OK) + { + throw_spdlog_ex(fmt_lib::format("conf set bootstrap.servers failed err:{}", errstr)); + } + + tconf_.reset(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC)); + if (tconf_ == nullptr) + { + throw_spdlog_ex(fmt_lib::format("create topic config failed")); + } + + producer_.reset(RdKafka::Producer::create(conf_.get(), errstr)); + if (producer_ == nullptr) + { + throw_spdlog_ex(fmt_lib::format("create producer failed err:{}", errstr)); + } + topic_.reset(RdKafka::Topic::create(producer_.get(), config_.produce_topic, tconf_.get(), errstr)); + if (topic_ == nullptr) + { + throw_spdlog_ex(fmt_lib::format("create topic failed err:{}", errstr)); + } + } + catch (const std::exception &e) + { + throw_spdlog_ex(fmt_lib::format("error create kafka instance: {}", e.what())); + } + } + + ~kafka_skin() + { + producer_->flush(config_.flush_timeout_ms); + } + +protected: + void sink_it_(const details::log_msg &msg) override + { + producer_->produce(topic_.get(), 0, RdKafka::Producer::RK_MSG_COPY, (void *)msg.payload.data(), msg.payload.size(), NULL, NULL); + } + + void flush_() override + { + producer_->flush(config_.flush_timeout_ms); + } + +private: + kafka_sink_config config_; + std::unique_ptr producer_ = nullptr; + std::unique_ptr conf_ = nullptr; + std::unique_ptr tconf_ = nullptr; + std::unique_ptr topic_ = nullptr; +}; + +using kafka_sink_mt = kafka_skin; +using kafka_sink_st = kafka_skin; + +} // namespace sinks + +template +inline std::shared_ptr kafka_logger_mt(const std::string &logger_name, spdlog::sinks::kafka_sink_config config) +{ + return Factory::template create(logger_name, config); +} + +template +inline std::shared_ptr kafka_logger_st(const std::string &logger_name, spdlog::sinks::kafka_sink_config config) +{ + return Factory::template create(logger_name, config); +} + +template +inline std::shared_ptr kafka_logger_async_mt(std::string logger_name, spdlog::sinks::kafka_sink_config config) +{ + return Factory::template create(logger_name, config); +} + +template +inline std::shared_ptr kafka_logger_async_st(std::string logger_name, spdlog::sinks::kafka_sink_config config) +{ + return Factory::template create(logger_name, config); +} + +} // namespace spdlog \ No newline at end of file