1- use futures:: { future, StreamExt } ;
1+ use futures:: { future, stream , StreamExt } ;
22use k8s_openapi:: api:: {
33 apps:: v1:: Deployment ,
44 core:: v1:: { ConfigMap , Secret } ,
55} ;
66use kube:: {
7+ api:: { ApiResource , DynamicObject , GroupVersionKind } ,
8+ core:: TypedResource ,
79 runtime:: {
8- reflector,
9- reflector:: { ObjectRef , Store } ,
10- watcher, WatchStreamExt ,
10+ reflector:: {
11+ store:: { CacheWriter , Writer } ,
12+ ObjectRef , Store ,
13+ } ,
14+ watcher:: { self , dynamic_watcher} ,
15+ WatchStreamExt ,
1116 } ,
12- Api , Client ,
17+ Api , Client , Resource ,
1318} ;
19+ use parking_lot:: RwLock ;
20+ use serde:: de:: DeserializeOwned ;
1421use std:: sync:: Arc ;
1522use tracing:: * ;
1623
17- // This does not work because Resource trait is not dyn safe.
18- /*
19- use std::any::TypeId;
2024use std:: collections:: HashMap ;
21- use k8s_openapi::NamespaceResourceScope;
22- use kube::api::{Resource, ResourceExt};
23- struct MultiStore {
24- stores: HashMap<TypeId, Store<dyn Resource<DynamicType = (), Scope = NamespaceResourceScope>>>,
25- }
26- impl MultiStore {
27- fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<K>> {
28- let oref = ObjectRef::<K>::new(name).within(ns);
29- if let Some(store) = self.stores.get(&TypeId::of::<K>()) {
30- store.get(oref)
31- } else {
32- None
33- }
34- }
35- }*/
3625
37- // explicit store can work
26+ type Cache = Arc < RwLock < HashMap < GroupVersionKind , Writer < DynamicObject > > > > ;
27+
28+ #[ derive( Default , Clone ) ]
29+ struct MultiWriter {
30+ store : Cache ,
31+ buffer : Cache ,
32+ }
33+
34+ #[ derive( Default , Clone ) ]
3835struct MultiStore {
39- deploys : Store < Deployment > ,
40- cms : Store < ConfigMap > ,
41- secs : Store < Secret > ,
36+ store : Cache ,
4237}
43- // but using generics to help out won't because the K needs to be concretised
44- /*
45- impl MultiStore {
46- fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<Option<K>>> {
47- let oref = ObjectRef::<K>::new(name).within(ns);
48- let kind = K::kind(&()).to_owned();
49- match kind.as_ref() {
50- "Deployment" => self.deploys.get(&ObjectRef::new(name).within(ns)),
51- "ConfigMap" => self.cms.get(&ObjectRef::new(name).within(ns)),
52- "Secret" => self.secs.get(&ObjectRef::new(name).within(ns)),
53- _ => None,
38+
39+ impl MultiWriter {
40+ fn as_reader ( & self ) -> MultiStore {
41+ MultiStore {
42+ store : self . store . clone ( ) ,
5443 }
55- None
5644 }
5745}
58- */
59- // so left with this
6046
6147impl MultiStore {
62- fn get_deploy ( & self , name : & str , ns : & str ) -> Option < Arc < Deployment > > {
63- self . deploys . get ( & ObjectRef :: < Deployment > :: new ( name) . within ( ns) )
48+ fn get < K : Resource < DynamicType = impl Default > + DeserializeOwned + Clone > (
49+ & self ,
50+ name : & str ,
51+ ns : & str ,
52+ ) -> Option < Arc < K > > {
53+ let oref = ObjectRef :: < K > :: new ( name) . within ( ns) . erase ( ) ;
54+ let store = self . get_store :: < K > ( ) ?;
55+ let obj = store. get ( & oref) ?. as_ref ( ) . clone ( ) ;
56+ obj. try_parse ( ) . ok ( ) . map ( Arc :: new)
6457 }
6558
66- fn get_secret ( & self , name : & str , ns : & str ) -> Option < Arc < Secret > > {
67- self . secs . get ( & ObjectRef :: < Secret > :: new ( name) . within ( ns) )
59+ fn get_store < K : Resource < DynamicType = impl Default > + DeserializeOwned + Clone > (
60+ & self ,
61+ ) -> Option < Store < DynamicObject > > {
62+ Some ( self . store . read ( ) . get ( & K :: gvk ( & Default :: default ( ) ) ) ?. as_reader ( ) )
6863 }
64+ }
6965
70- fn get_cm ( & self , name : & str , ns : & str ) -> Option < Arc < ConfigMap > > {
71- self . cms . get ( & ObjectRef :: < ConfigMap > :: new ( name) . within ( ns) )
66+ impl CacheWriter < DynamicObject > for MultiWriter {
67+ /// Applies a single watcher event to the store
68+ fn apply_watcher_event ( & mut self , event : & watcher:: Event < DynamicObject > ) {
69+ match event {
70+ watcher:: Event :: Init | watcher:: Event :: InitDone ( None ) => { }
71+ watcher:: Event :: Apply ( obj) | watcher:: Event :: Delete ( obj) => {
72+ let mut stores = self . store . write ( ) ;
73+ if stores. get ( & obj. gvk ( ) ) . is_none ( ) {
74+ let store = Writer :: new ( ApiResource :: from_gvk ( & obj. gvk ( ) ) ) ;
75+ stores. insert ( obj. gvk ( ) , store) ;
76+ } ;
77+ if let Some ( store) = stores. get_mut ( & obj. gvk ( ) ) {
78+ store. apply_watcher_event ( event) ;
79+ } ;
80+ }
81+ watcher:: Event :: InitApply ( obj) => {
82+ let mut buffer = self . buffer . write ( ) ;
83+ if buffer. get ( & obj. gvk ( ) ) . is_none ( ) {
84+ let store = Writer :: new ( ApiResource :: from_gvk ( & obj. gvk ( ) ) ) ;
85+ buffer. insert ( obj. gvk ( ) , store) ;
86+ } ;
87+ if let Some ( store) = buffer. get_mut ( & obj. gvk ( ) ) {
88+ store. apply_watcher_event ( event) ;
89+ } ;
90+ }
91+ watcher:: Event :: InitDone ( Some ( obj) ) => {
92+ let mut buffer = self . buffer . write ( ) ;
93+ if let Some ( mut store) = buffer. remove ( & obj. gvk ( ) ) {
94+ store. apply_watcher_event ( event) ;
95+ self . store . write ( ) . insert ( obj. gvk ( ) , store) ;
96+ }
97+ }
98+ }
7299 }
73100}
74101
@@ -77,60 +104,43 @@ async fn main() -> anyhow::Result<()> {
77104 tracing_subscriber:: fmt:: init ( ) ;
78105 let client = Client :: try_default ( ) . await ?;
79106
80- let deploys: Api < Deployment > = Api :: default_namespaced ( client. clone ( ) ) ;
81- let cms: Api < ConfigMap > = Api :: default_namespaced ( client. clone ( ) ) ;
82- let secret: Api < Secret > = Api :: default_namespaced ( client. clone ( ) ) ;
83-
84- let ( dep_reader, dep_writer) = reflector:: store :: < Deployment > ( ) ;
85- let ( cm_reader, cm_writer) = reflector:: store :: < ConfigMap > ( ) ;
86- let ( sec_reader, sec_writer) = reflector:: store :: < Secret > ( ) ;
107+ // multistore
108+ let combo_stream = stream:: select_all ( vec ! [
109+ dynamic_watcher( Api :: <Deployment >:: all( client. clone( ) ) , Default :: default ( ) ) . boxed( ) ,
110+ dynamic_watcher( Api :: <ConfigMap >:: all( client. clone( ) ) , Default :: default ( ) ) . boxed( ) ,
111+ dynamic_watcher( Api :: <Secret >:: all( client. clone( ) ) , Default :: default ( ) ) . boxed( ) ,
112+ ] ) ;
87113
88- let cfg = watcher:: Config :: default ( ) ;
89- let dep_watcher = watcher ( deploys, cfg. clone ( ) )
90- . reflect ( dep_writer)
91- . applied_objects ( )
92- . for_each ( |_| future:: ready ( ( ) ) ) ;
93- let cm_watcher = watcher ( cms, cfg. clone ( ) )
94- . reflect ( cm_writer)
114+ let multi_writer = MultiWriter :: default ( ) ;
115+ let watcher = combo_stream
116+ . reflect ( multi_writer. clone ( ) )
95117 . applied_objects ( )
96118 . for_each ( |_| future:: ready ( ( ) ) ) ;
97- let sec_watcher = watcher ( secret, cfg)
98- . reflect ( sec_writer)
99- . applied_objects ( )
100- . for_each ( |_| future:: ready ( ( ) ) ) ;
101- // poll these forever
102-
103- // multistore
104- let stores = MultiStore {
105- deploys : dep_reader,
106- cms : cm_reader,
107- secs : sec_reader,
108- } ;
109119
110120 // simulate doing stuff with the stores from some other thread
111121 tokio:: spawn ( async move {
112- // Show state every 5 seconds of watching
113- info ! ( "waiting for them to be ready" ) ;
114- stores. deploys . wait_until_ready ( ) . await . unwrap ( ) ;
115- stores. cms . wait_until_ready ( ) . await . unwrap ( ) ;
116- stores. secs . wait_until_ready ( ) . await . unwrap ( ) ;
117- info ! ( "stores initialised" ) ;
118122 // can use helper accessors
119- info ! (
120- "common cm: {:?}" ,
121- stores. get_cm( "kube-root-ca.crt" , "kube-system" ) . unwrap( )
122- ) ;
123123 loop {
124124 tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 5 ) ) . await ;
125+ info ! (
126+ "cache content: {:?}" ,
127+ multi_writer. as_reader( ) . store. read( ) . keys( )
128+ ) ;
129+ info ! (
130+ "common cm: {:?}" ,
131+ multi_writer
132+ . as_reader( )
133+ . get:: <ConfigMap >( "kube-root-ca.crt" , "kube-system" )
134+ ) ;
125135 // access individual sub stores
126- info ! ( "Current deploys count: {}" , stores. deploys. state( ) . len( ) ) ;
136+ if let Some ( deploys) = multi_writer. as_reader ( ) . get_store :: < Deployment > ( ) {
137+ info ! ( "Current deploys count: {}" , deploys. state( ) . len( ) ) ;
138+ }
127139 }
128140 } ) ;
129- // info!("long watches starting");
141+ info ! ( "long watches starting" ) ;
130142 tokio:: select! {
131- r = dep_watcher => println!( "dep watcher exit: {r:?}" ) ,
132- r = cm_watcher => println!( "cm watcher exit: {r:?}" ) ,
133- r = sec_watcher => println!( "sec watcher exit: {r:?}" ) ,
143+ r = watcher => println!( "watcher exit: {r:?}" ) ,
134144 }
135145
136146 Ok ( ( ) )
0 commit comments