Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit 6a3e95d

Browse files
Release/1.1.2 Watcher Fix (#36)
1 parent 378bb17 commit 6a3e95d

File tree

9 files changed

+150
-29
lines changed

9 files changed

+150
-29
lines changed

.circleci/config.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,6 @@ jobs:
5050
- checkout
5151
- setup_remote_docker:
5252
reusable: true
53-
- run:
54-
name: Push rubygem to s3
55-
command: |
56-
.circleci/push_gem.sh
5753
- run:
5854
name: Build and push docker image to ecr
5955
command: |
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
---
2+
name: Bug report
3+
about: Report a bug encountered while operating fluent-plugin-kubernetes-objects
4+
title: ''
5+
labels: ''
6+
assignees: ''
7+
8+
---
9+
10+
<!-- Please use this template while reporting a bug and provide as much info as possible. Not doing so may result in your bug not being addressed in a timely manner. Thanks!
11+
12+
Please do not report security vulnerabilities with public GitHub issue reports. Please report security issues here: https://www.splunk.com/goto/report_vulnerabilities_prodsec
13+
-->
14+
15+
16+
**What happened**:
17+
18+
**What you expected to happen**:
19+
20+
**How to reproduce it (as minimally and precisely as possible)**:
21+
22+
**Anything else we need to know?**:
23+
24+
**Environment**:
25+
- Kubernetes version (use `kubectl version`):
26+
- Ruby version (use `ruby --version`):
27+
- OS (e.g: `cat /etc/os-release`):
28+
- Splunk version:
29+
- Others:
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
name: Enhancement Request
3+
about: Suggest an enhancement to the fluent-plugin-kubernetes-objects project
4+
title: ''
5+
labels: ''
6+
assignees: ''
7+
8+
---
9+
10+
<<!-- Please only use this template for submitting enhancement requests -->
11+
12+
**What would you like to be added**:
13+
14+
**Why is this needed**:
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
---
2+
name: Failing Test
3+
about: Report test failures in fluent-plugin-kubernetes-objects
4+
title: ''
5+
labels: ''
6+
assignees: ''
7+
8+
---
9+
10+
<!-- Please only use this template for submitting reports about failing tests -->
11+
12+
**Which test(s) are failing**:
13+
14+
**Since when has it been failing**:
15+
16+
**Reason for failure**:
17+
18+
**Anything else we need to know**:

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
## Proposed changes
2+
3+
Describe the big picture of your changes here to communicate to the maintainers why we should accept this pull request. If it fixes a bug or resolves a feature request, be sure to link to that issue.
4+
5+
## Types of changes
6+
7+
What types of changes does your code introduce?
8+
_Put an `x` in the boxes that apply_
9+
10+
- [ ] Bugfix (non-breaking change which fixes an issue)
11+
- [ ] New feature (non-breaking change which adds functionality)
12+
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
13+
14+
## Checklist
15+
16+
_Put an `x` in the boxes that apply._
17+
18+
- [ ] I have read the [CONTRIBUTING](https://github.com/splunk/fluent-plugin-kubernetes-objects/blob/develop/CONTRIBUTING.md) doc
19+
- [ ] I have read the [CLA](https://github.com/splunk/fluent-plugin-kubernetes-objects/blob/develop/CLA.md)
20+
- [ ] I have added tests that prove my fix is effective or that my feature works
21+
- [ ] I have added necessary documentation (if appropriate)
22+
- [ ] Any dependent changes have been merged and published in downstream modules
23+

CLA.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
By submitting a Contribution to this Work, You agree that Your Contribution is made subject to the primary LICENSE
2+
file applicable to this Work. In addition, You represent that: (i) You are the copyright owner of the Contribution
3+
or (ii) You have the requisite rights to make the Contribution.
4+
5+
Definitions:
6+
7+
“You” shall mean: (i) yourself if you are making a Contribution on your own behalf; or (ii) your company,
8+
if you are making a Contribution on behalf of your company. If you are making a Contribution on behalf of your
9+
company, you represent that you have the requisite authority to do so.
10+
11+
"Contribution" shall mean any original work of authorship, including any modifications or additions to an existing
12+
work, that is intentionally submitted by You for inclusion in, or documentation of, this project/repository. For the
13+
purposes of this definition, "submitted" means any form of electronic, verbal, or written communication submitted for
14+
inclusion in this project/repository, including but not limited to communication on electronic mailing lists, source
15+
code control systems, and issue tracking systems that are managed by, or on behalf of, the maintainers of
16+
the project/repository.
17+
18+
“Work” shall mean the collective software, content, and documentation in this project/repository.

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.1
1+
1.1.2

lib/fluent/plugin/in_kubernetes_objects.rb

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
module Fluent::Plugin
77
class KubernetesObjectsInput < Fluent::Plugin::Input
8-
VERSION = '1.1.1'.freeze
8+
VERSION = '1.1.2'.freeze
99

1010
Fluent::Plugin.register_input('kubernetes_objects', self)
1111

@@ -102,7 +102,6 @@ def start
102102
end
103103

104104
def close
105-
@watchers.each &:finish if @watchers
106105
super
107106
end
108107

@@ -118,6 +117,21 @@ def generate_tag(item_name)
118117
[@tag_prefix, item_name, @tag_suffix].join
119118
end
120119

120+
def init_with_kubeconfig()
121+
options = {}
122+
config = Kubeclient::Config.read @kubeconfig
123+
current_context = config.context
124+
125+
@client = Kubeclient::Client.new(
126+
current_context.api_endpoint,
127+
current_context.api_version,
128+
options.merge(
129+
ssl_options: current_context.ssl_options,
130+
auth_options: current_context.auth_options
131+
)
132+
)
133+
end
134+
121135
def initialize_client
122136
# mostly borrowed from Fluentd Kubernetes Metadata Filter Plugin
123137
if @kubernetes_url.nil?
@@ -173,18 +187,7 @@ def start_pullers
173187
end
174188

175189
def start_watchers
176-
@watchers = @watch_objects.map do |o|
177-
o = o.to_h.dup
178-
o[:as] = :raw
179-
resource_name = o.delete(:resource_name)
180-
watch_interval = o.delete(:interval)
181-
182-
version = @storage.get(resource_name)
183-
o[:resource_version] = version if version
184-
@client.public_send("watch_#{resource_name}", o).tap do |watcher|
185-
create_watcher_thread resource_name, watcher, watch_interval
186-
end
187-
end
190+
@watch_objects.each(&method(:create_watcher_thread))
188191
end
189192

190193
def create_pull_thread(conf)
@@ -227,19 +230,35 @@ def create_pull_thread(conf)
227230
end
228231
end
229232

230-
def create_watcher_thread(object_name, watcher, interval)
231-
thread_create(:"watch_#{object_name}") do
232-
tag = generate_tag "#{object_name}.watch"
233+
def create_watcher_thread(conf)
234+
options = conf.to_h.dup
235+
options[:as] = :raw
236+
resource_name = options[:resource_name]
237+
version = @storage.get(resource_name)
238+
if version
239+
options[:resource_version] = version
240+
else
241+
options[:resource_version] = 0
242+
end
243+
244+
thread_create :"watch_#{resource_name}" do
233245
while thread_current_running?
234-
watcher.each do |entity|
235-
log.trace { "Received new object from watching #{object_name}" }
236-
entity = JSON.parse(entity)
237-
router.emit tag, Fluent::Engine.now, entity
238-
@storage.put object_name, entity['object']['metadata']['resourceVersion']
239-
sleep(interval)
246+
@client.public_send("watch_#{resource_name}", options).tap do |watcher|
247+
tag = generate_tag "#{resource_name}"
248+
watcher.each do |entity|
249+
begin
250+
entity = JSON.parse(entity)
251+
router.emit tag, Fluent::Engine.now, entity
252+
options[:resource_version] = entity['object']['metadata']['resourceVersion']
253+
@storage.put resource_name, entity['object']['metadata']['resourceVersion']
254+
rescue => e
255+
log.info "Got exception #{e} parsing entity #{entity}. Resetting watcher."
256+
end
257+
end
240258
end
241259
end
242260
end
243261
end
244262
end
245263
end
264+

test/fluent/plugin/in_kubernetes_objects_test.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,12 @@
109109
</watch>
110110
CONF
111111

112+
stub_k8s_events params: {resourceVersion: "0"}
113+
stub_k8s_events params: {resourceVersion: "6621683"}
114+
112115
d.run expect_emits: 1, timeout: 3
113116
events = d.events
114-
expect(events.all? { |e| e[0] == 'kubernetes.events.watch'}).must_equal true
117+
expect(events.all? { |e| e[0] == 'kubernetes.events'}).must_equal true
115118
end
116119

117120
it "should use checkpoints for watching" do
@@ -133,6 +136,7 @@
133136
CONF
134137

135138
stub_k8s_events params: {resourceVersion: "123456"}
139+
stub_k8s_events params: {resourceVersion: "6621683"}
136140

137141
d.run expect_emits: 1, timeout: 3
138142
ensure

0 commit comments

Comments
 (0)