Skip to content

Commit

Permalink
update 0.1.10docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Will-Liang committed Aug 16, 2023
1 parent ec50f2b commit f6eceeb
Show file tree
Hide file tree
Showing 37 changed files with 5,152 additions and 0 deletions.
207 changes: 207 additions & 0 deletions docs/_modules/liangutil/kafkautils.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
<!DOCTYPE html>
<html class="writer-html5" lang="zh-CN" >
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>liangutil.kafkautils &mdash; liangutil 0.1.10 文档</title>
<link rel="stylesheet" href="../../_static/pygments.css" type="text/css" />
<link rel="stylesheet" href="../../_static/css/theme.css" type="text/css" />
<!--[if lt IE 9]>
<script src="../../_static/js/html5shiv.min.js"></script>
<![endif]-->

<script src="../../_static/jquery.js?v=5d32c60e"></script>
<script src="../../_static/_sphinx_javascript_frameworks_compat.js?v=2cd50e6c"></script>
<script data-url_root="../../" id="documentation_options" src="../../_static/documentation_options.js?v=bdd39a68"></script>
<script src="../../_static/doctools.js?v=888ff710"></script>
<script src="../../_static/sphinx_highlight.js?v=4825356b"></script>
<script src="../../_static/translations.js?v=beaddf03"></script>
<script src="../../_static/js/theme.js"></script>
<link rel="index" title="索引" href="../../genindex.html" />
<link rel="search" title="搜索" href="../../search.html" />
</head>

<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >



<a href="../../index.html" class="icon icon-home">
liangutil
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../search.html" method="get">
<input type="text" name="q" placeholder="搜索文档" aria-label="搜索文档" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div><div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="导航菜单">
<!-- Local TOC -->
<div class="local-toc"></div>
</div>
</div>
</nav>

<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"><nav class="wy-nav-top" aria-label="移动版导航菜单" >
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../index.html">liangutil</a>
</nav>

<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="页面导航">
<ul class="wy-breadcrumbs">
<li><a href="../../index.html" class="icon icon-home" aria-label="Home"></a></li>
<li class="breadcrumb-item"><a href="../index.html">模块代码</a></li>
<li class="breadcrumb-item active">liangutil.kafkautils</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">

<h1>liangutil.kafkautils 源代码</h1><div class="highlight"><pre>
<span></span><span class="c1"># -*- coding: utf-8 -*-</span>
<span class="kn">from</span> <span class="nn">confluent_kafka</span> <span class="kn">import</span> <span class="n">Producer</span><span class="p">,</span> <span class="n">Consumer</span>

<span class="kn">from</span> <span class="nn">liangutil.liangutils</span> <span class="kn">import</span> <span class="n">get_nowdatetime</span>


<div class="viewcode-block" id="kafka_callback"><a class="viewcode-back" href="../../liangutil.html#liangutil.kafkautils.kafka_callback">[文档]</a><span class="k">def</span> <span class="nf">kafka_callback</span><span class="p">(</span><span class="n">err</span><span class="p">,</span> <span class="n">msg</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;kafka的回调函数(不要直接调用)&quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">err</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">&quot;Failed to deliver message: </span><span class="si">{0}</span><span class="s2"> || </span><span class="si">{1}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">err</span><span class="p">,</span> <span class="n">get_nowdatetime</span><span class="p">()))</span>
<span class="k">else</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span>
<span class="s2">&quot;Message produced success in : </span><span class="si">{0}</span><span class="s2"> || </span><span class="si">{1}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">msg</span><span class="o">.</span><span class="n">topic</span><span class="p">(),</span> <span class="n">get_nowdatetime</span><span class="p">()</span>
<span class="p">)</span>
<span class="p">)</span></div>


<div class="viewcode-block" id="kafka_producer"><a class="viewcode-back" href="../../liangutil.html#liangutil.kafkautils.kafka_producer">[文档]</a><span class="k">def</span> <span class="nf">kafka_producer</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span> <span class="n">message</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">broker_ips</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;向kafka发送消息</span>

<span class="sd"> Args:</span>
<span class="sd"> topic(str): 将要发送的 Kafka 主题</span>
<span class="sd"> message(str): 要发送的消息</span>
<span class="sd"> key(str):用于分区的键</span>
<span class="sd"> broker_ips(list): Kafka broker IP 地址列表</span>

<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># 用逗号分隔 broker_ips 列表以生成 bootstrap.servers 配置值。</span>
<span class="n">bootstrap_servers</span> <span class="o">=</span> <span class="s1">&#39;,&#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">broker_ips</span><span class="p">)</span>

<span class="c1"># 创建 Kafka 配置和生产者实例</span>
<span class="n">conf</span> <span class="o">=</span> <span class="p">{</span><span class="s1">&#39;bootstrap.servers&#39;</span><span class="p">:</span> <span class="n">bootstrap_servers</span><span class="p">}</span>
<span class="n">p</span> <span class="o">=</span> <span class="n">Producer</span><span class="p">(</span><span class="n">conf</span><span class="p">)</span>
<span class="c1"># 生产并发送消息</span>
<span class="n">p</span><span class="o">.</span><span class="n">produce</span><span class="p">(</span>
<span class="n">topic</span><span class="p">,</span>
<span class="n">value</span><span class="o">=</span><span class="n">message</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">),</span>
<span class="n">key</span><span class="o">=</span><span class="n">key</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">),</span>
<span class="n">callback</span><span class="o">=</span><span class="n">kafka_callback</span><span class="p">,</span>
<span class="p">)</span>
<span class="c1"># 等待未完成的消息,确保所有消息都已发送</span>
<span class="n">p</span><span class="o">.</span><span class="n">flush</span><span class="p">(</span><span class="n">timeout</span><span class="o">=</span><span class="mi">30</span><span class="p">)</span></div>


<div class="viewcode-block" id="kafka_consumer"><a class="viewcode-back" href="../../liangutil.html#liangutil.kafkautils.kafka_consumer">[文档]</a><span class="k">def</span> <span class="nf">kafka_consumer</span><span class="p">(</span>
<span class="n">broker_ips</span><span class="p">,</span>
<span class="n">topics</span><span class="p">,</span>
<span class="n">group_id</span><span class="p">,</span>
<span class="n">auto_offset_reset</span><span class="o">=</span><span class="s1">&#39;earliest&#39;</span><span class="p">,</span>
<span class="n">timeout</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span>
<span class="n">message_limit</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span>
<span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;Kafka消费者</span>

<span class="sd"> Args:</span>
<span class="sd"> broker_ips(list):Kafka broker IP 地址列表</span>
<span class="sd"> topics(list):要订阅的 Kafka 主题列表</span>
<span class="sd"> group_id(str): 消费者组 ID</span>
<span class="sd"> auto_offset_reset(str): 从主题开始的位置消费,可选值为 &#39;earliest&#39; 或 &#39;latest&#39;。</span>
<span class="sd"> timeout(int): 消费者轮询超时时间</span>
<span class="sd"> message_limit(int): 需要处理的消息数量限制</span>

<span class="sd"> Returns:</span>
<span class="sd"> dict: 消费者收到的记录列表,每个记录为一个字典,包含 key, value, topic, partition, offset 信息。</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># 定义 Kafka 配置</span>
<span class="n">conf</span> <span class="o">=</span> <span class="p">{</span>
<span class="s1">&#39;bootstrap.servers&#39;</span><span class="p">:</span> <span class="s1">&#39;,&#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">broker_ips</span><span class="p">),</span>
<span class="s1">&#39;group.id&#39;</span><span class="p">:</span> <span class="n">group_id</span><span class="p">,</span>
<span class="s1">&#39;auto.offset.reset&#39;</span><span class="p">:</span> <span class="n">auto_offset_reset</span><span class="p">,</span>
<span class="p">}</span>

<span class="c1"># 创建消费者</span>
<span class="n">consumer</span> <span class="o">=</span> <span class="n">Consumer</span><span class="p">(</span><span class="n">conf</span><span class="p">)</span>

<span class="c1"># 订阅主题</span>
<span class="n">consumer</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="n">topics</span><span class="p">)</span>

<span class="c1"># 定义一个列表,用于保存接收到的消息</span>
<span class="n">messages</span> <span class="o">=</span> <span class="p">[]</span>

<span class="c1"># 持续监听并处理消息,直到达到消息限制</span>
<span class="n">processed_messages</span> <span class="o">=</span> <span class="mi">0</span>
<span class="k">while</span> <span class="n">message_limit</span> <span class="ow">is</span> <span class="kc">None</span> <span class="ow">or</span> <span class="n">processed_messages</span> <span class="o">&lt;</span> <span class="n">message_limit</span><span class="p">:</span>
<span class="n">msg</span> <span class="o">=</span> <span class="n">consumer</span><span class="o">.</span><span class="n">poll</span><span class="p">(</span><span class="n">timeout</span><span class="p">)</span>

<span class="k">if</span> <span class="n">msg</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="c1"># 在这里可以处理没有消息的情况(例如记录或异步处理)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">&quot;kafka msg is null&quot;</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">msg</span><span class="o">.</span><span class="n">error</span><span class="p">():</span>
<span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&quot;Kafka error: </span><span class="si">{</span><span class="n">msg</span><span class="o">.</span><span class="n">error</span><span class="p">()</span><span class="si">}</span><span class="s2">&quot;</span><span class="p">)</span>
<span class="k">break</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">record</span> <span class="o">=</span> <span class="p">{</span>
<span class="s1">&#39;key&#39;</span><span class="p">:</span> <span class="n">msg</span><span class="o">.</span><span class="n">key</span><span class="p">(),</span>
<span class="s1">&#39;value&#39;</span><span class="p">:</span> <span class="n">msg</span><span class="o">.</span><span class="n">value</span><span class="p">(),</span>
<span class="s1">&#39;topic&#39;</span><span class="p">:</span> <span class="n">msg</span><span class="o">.</span><span class="n">topic</span><span class="p">(),</span>
<span class="s1">&#39;partition&#39;</span><span class="p">:</span> <span class="n">msg</span><span class="o">.</span><span class="n">partition</span><span class="p">(),</span>
<span class="s1">&#39;offset&#39;</span><span class="p">:</span> <span class="n">msg</span><span class="o">.</span><span class="n">offset</span><span class="p">(),</span>
<span class="p">}</span>
<span class="n">messages</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">record</span><span class="p">)</span>
<span class="n">processed_messages</span> <span class="o">+=</span> <span class="mi">1</span>

<span class="c1"># 关闭消费者</span>
<span class="n">consumer</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>

<span class="k">return</span> <span class="n">messages</span></div>
</pre></div>

</div>
</div>
<footer>

<hr/>

<div role="contentinfo">
<p>&#169; 版权所有 2023, LiAng88。</p>
</div>

利用 <a href="https://www.sphinx-doc.org/">Sphinx</a> 构建,使用的
<a href="https://github.com/readthedocs/sphinx_rtd_theme">主题</a>
<a href="https://readthedocs.org">Read the Docs</a> 开发.


</footer>
</div>
</div>
</section>
</div>
<script>
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>

</body>
</html>
123 changes: 123 additions & 0 deletions docs/_sphinx_javascript_frameworks_compat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/* Compatability shim for jQuery and underscores.js.
*
* Copyright Sphinx contributors
* Released under the two clause BSD licence
*/

/**
* small helper function to urldecode strings
*
* See https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/decodeURIComponent#Decoding_query_parameters_from_a_URL
*/
jQuery.urldecode = function(x) {
if (!x) {
return x
}
return decodeURIComponent(x.replace(/\+/g, ' '));
};

/**
* small helper function to urlencode strings
*/
jQuery.urlencode = encodeURIComponent;

/**
* This function returns the parsed url parameters of the
* current request. Multiple values per key are supported,
* it will always return arrays of strings for the value parts.
*/
jQuery.getQueryParameters = function(s) {
if (typeof s === 'undefined')
s = document.location.search;
var parts = s.substr(s.indexOf('?') + 1).split('&');
var result = {};
for (var i = 0; i < parts.length; i++) {
var tmp = parts[i].split('=', 2);
var key = jQuery.urldecode(tmp[0]);
var value = jQuery.urldecode(tmp[1]);
if (key in result)
result[key].push(value);
else
result[key] = [value];
}
return result;
};

/**
* highlight a given string on a jquery object by wrapping it in
* span elements with the given class name.
*/
jQuery.fn.highlightText = function(text, className) {
function highlight(node, addItems) {
if (node.nodeType === 3) {
var val = node.nodeValue;
var pos = val.toLowerCase().indexOf(text);
if (pos >= 0 &&
!jQuery(node.parentNode).hasClass(className) &&
!jQuery(node.parentNode).hasClass("nohighlight")) {
var span;
var isInSVG = jQuery(node).closest("body, svg, foreignObject").is("svg");
if (isInSVG) {
span = document.createElementNS("http://www.w3.org/2000/svg", "tspan");
} else {
span = document.createElement("span");
span.className = className;
}
span.appendChild(document.createTextNode(val.substr(pos, text.length)));
node.parentNode.insertBefore(span, node.parentNode.insertBefore(
document.createTextNode(val.substr(pos + text.length)),
node.nextSibling));
node.nodeValue = val.substr(0, pos);
if (isInSVG) {
var rect = document.createElementNS("http://www.w3.org/2000/svg", "rect");
var bbox = node.parentElement.getBBox();
rect.x.baseVal.value = bbox.x;
rect.y.baseVal.value = bbox.y;
rect.width.baseVal.value = bbox.width;
rect.height.baseVal.value = bbox.height;
rect.setAttribute('class', className);
addItems.push({
"parent": node.parentNode,
"target": rect});
}
}
}
else if (!jQuery(node).is("button, select, textarea")) {
jQuery.each(node.childNodes, function() {
highlight(this, addItems);
});
}
}
var addItems = [];
var result = this.each(function() {
highlight(this, addItems);
});
for (var i = 0; i < addItems.length; ++i) {
jQuery(addItems[i].parent).before(addItems[i].target);
}
return result;
};

/*
* backward compatibility for jQuery.browser
* This will be supported until firefox bug is fixed.
*/
if (!jQuery.browser) {
jQuery.uaMatch = function(ua) {
ua = ua.toLowerCase();

var match = /(chrome)[ \/]([\w.]+)/.exec(ua) ||
/(webkit)[ \/]([\w.]+)/.exec(ua) ||
/(opera)(?:.*version|)[ \/]([\w.]+)/.exec(ua) ||
/(msie) ([\w.]+)/.exec(ua) ||
ua.indexOf("compatible") < 0 && /(mozilla)(?:.*? rv:([\w.]+)|)/.exec(ua) ||
[];

return {
browser: match[ 1 ] || "",
version: match[ 2 ] || "0"
};
};
jQuery.browser = {};
jQuery.browser[jQuery.uaMatch(navigator.userAgent).browser] = true;
}
Loading

0 comments on commit f6eceeb

Please sign in to comment.