Tokio/client API state lock improvements (#113)

* (cargo-release) version 0.36.0

* (cargo-release) version 0.36.0

* (cargo-release) version 0.36.0

* RwLock from future-locks to improve compatibility with Tokio

* State management update for futures-lock

* Docs and examples updates
This commit is contained in:
Abdulla Abdurakhmanov 2022-05-09 18:34:57 +02:00 committed by GitHub
parent 79e74dd5ab
commit 552f4997a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 141 additions and 45 deletions

View file

@ -24,11 +24,10 @@ let listener_environment = Arc::new(
async fn test_push_events_function(
event: SlackPushEvent,
client: Arc<SlackHyperClient>,
user_states_storage: Arc<SlackClientEventsUserState>,
user_state_storage: SlackClientEventsUserState,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// SlackClientEventsUserState is an alias for RwLock<SlackClientEventsUserStateStorage>
let states = user_states_storage.read().unwrap();
let states = user_state_storage.read().await;
let user_state: Option<&UserStateExample> =
states.get_user_state::<UserStateExample>();
@ -43,11 +42,10 @@ async fn test_push_events_function(
async fn test_push_events_function(
event: SlackPushEvent,
client: Arc<SlackHyperClient>,
user_states_storage: Arc<SlackClientEventsUserState>,
user_state_storage: SlackClientEventsUserState,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// SlackClientEventsUserState is an alias for RwLock<SlackClientEventsUserStateStorage>
let states = user_states_storage.write().unwrap();
let states = user_state_storage.write().await;
states.set_user_state(UserStateExample(555));

View file

@ -1,6 +1,6 @@
[package]
name = "slack-morphism"
version = "0.35.0"
version = "0.36.0"
authors = ["Abdulla Abdurakhmanov <me@abdolence.dev>"]
edition = "2021"
license = "Apache-2.0"
@ -22,7 +22,7 @@ name = "slack_morphism"
path = "src/lib.rs"
[dependencies]
slack-morphism-models = { path = "../models", version = "^0.35.0"}
slack-morphism-models = { path = "../models", version = "^0.36.0"}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_with = "1.13"
@ -31,6 +31,7 @@ rsb_derive = "0.5"
url = "2.2"
futures = "0.3"
futures-util = "0.3"
futures-locks = "0.7"
base64 = "0.13"
hex = "0.4"
tracing = "0.1"

View file

@ -1,9 +1,11 @@
use crate::{SlackClient, SlackClientHttpConnector};
use futures::executor::block_on;
use futures::FutureExt;
use rsb_derive::Builder;
use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use tracing::*;
type UserStatesMap = HashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>;
@ -14,9 +16,11 @@ where
{
pub client: Arc<SlackClient<SCHC>>,
pub error_handler: BoxedErrorHandler<SCHC>,
pub user_state: Arc<SlackClientEventsUserState>,
pub user_state: SlackClientEventsUserState,
}
pub type SlackClientEventsUserState = futures_locks::RwLock<SlackClientEventsUserStateStorage>;
impl<SCHC> SlackClientEventsListenerEnvironment<SCHC>
where
SCHC: SlackClientHttpConnector + Send + Sync,
@ -25,7 +29,7 @@ where
Self {
client,
error_handler: Box::new(Self::empty_error_handler),
user_state: Arc::new(RwLock::new(SlackClientEventsUserStateStorage::new())),
user_state: SlackClientEventsUserState::new(SlackClientEventsUserStateStorage::new()),
}
}
@ -39,14 +43,18 @@ where
fn empty_error_handler(
err: Box<dyn std::error::Error + Send + Sync>,
_client: Arc<SlackClient<SCHC>>,
_user_state_storage: Arc<RwLock<SlackClientEventsUserStateStorage>>,
_user_state_storage: SlackClientEventsUserState,
) -> http::StatusCode {
error!("Slack listener error occurred: {:?}", err);
http::StatusCode::BAD_REQUEST
}
pub fn with_user_state<T: Send + Sync + 'static>(self, state: T) -> Self {
self.user_state.write().unwrap().set_user_state(state);
let future_init_state = self
.user_state
.write()
.map(|mut guard| guard.set_user_state(state));
block_on(future_init_state);
self
}
}
@ -82,14 +90,12 @@ impl SlackClientEventsUserStateStorage {
}
}
pub type SlackClientEventsUserState = RwLock<SlackClientEventsUserStateStorage>;
pub type BoxedErrorHandler<SCHC> = Box<ErrorHandler<SCHC>>;
pub type ErrorHandler<SCHC> = fn(
Box<dyn std::error::Error + Send + Sync + 'static>,
Arc<SlackClient<SCHC>>,
Arc<RwLock<SlackClientEventsUserStateStorage>>,
SlackClientEventsUserState,
) -> http::StatusCode;
#[derive(Debug, PartialEq, Clone, Builder)]
@ -161,4 +167,4 @@ impl SlackOAuthListenerConfig {
}
pub type UserCallbackFunction<E, IF, SCHC> =
fn(E, Arc<SlackClient<SCHC>>, Arc<RwLock<SlackClientEventsUserStateStorage>>) -> IF;
fn(E, Arc<SlackClient<SCHC>>, SlackClientEventsUserState) -> IF;

View file

@ -1,12 +1,12 @@
use crate::errors::*;
use crate::listener::SlackClientEventsUserStateStorage;
use crate::listener::SlackClientEventsUserState;
use crate::prelude::{SlackInteractionEvent, SlackPushEventCallback, UserCallbackFunction};
use crate::{SlackClient, SlackClientHttpConnector, UserCallbackResult};
use futures::future::BoxFuture;
use slack_morphism_models::events::{SlackCommandEvent, SlackCommandEventResponse};
use slack_morphism_models::socket_mode::SlackSocketModeHelloEvent;
use std::future::Future;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use tracing::*;
pub trait SlackSocketModeListenerCallback<SCHC, RQ, RS>
@ -19,15 +19,13 @@ where
&self,
ev: RQ,
client: Arc<SlackClient<SCHC>>,
state_storage: Arc<RwLock<SlackClientEventsUserStateStorage>>,
state_storage: SlackClientEventsUserState,
) -> BoxFuture<'static, RS>;
}
impl<T, F, SCHC, RQ, RS> SlackSocketModeListenerCallback<SCHC, RQ, RS> for T
where
T: Send
+ Sync
+ Fn(RQ, Arc<SlackClient<SCHC>>, Arc<RwLock<SlackClientEventsUserStateStorage>>) -> F,
T: Send + Sync + Fn(RQ, Arc<SlackClient<SCHC>>, SlackClientEventsUserState) -> F,
F: Future<Output = RS> + Send + 'static,
SCHC: SlackClientHttpConnector + Send + Sync,
RQ: Send + Sync + 'static,
@ -37,7 +35,7 @@ where
&self,
ev: RQ,
client: Arc<SlackClient<SCHC>>,
state_storage: Arc<RwLock<SlackClientEventsUserStateStorage>>,
state_storage: SlackClientEventsUserState,
) -> BoxFuture<'static, RS> {
Box::pin(self(ev, client, state_storage))
}
@ -97,7 +95,7 @@ where
async fn empty_hello_callback(
event: SlackSocketModeHelloEvent,
_client: Arc<SlackClient<SCHC>>,
_states: Arc<RwLock<SlackClientEventsUserStateStorage>>,
_states: SlackClientEventsUserState,
) {
debug!("Received Slack hello for socket mode: {:?}", event);
}
@ -116,7 +114,7 @@ where
async fn empty_command_events_callback(
event: SlackCommandEvent,
_client: Arc<SlackClient<SCHC>>,
_states: Arc<RwLock<SlackClientEventsUserStateStorage>>,
_states: SlackClientEventsUserState,
) -> Result<SlackCommandEventResponse, Box<dyn std::error::Error + Send + Sync>> {
warn!("No callback is specified for a command event: {:?}", event);
Err(Box::new(SlackClientError::SystemError(
@ -139,7 +137,7 @@ where
async fn empty_interaction_events_callback(
event: SlackInteractionEvent,
_client: Arc<SlackClient<SCHC>>,
_states: Arc<RwLock<SlackClientEventsUserStateStorage>>,
_states: SlackClientEventsUserState,
) -> UserCallbackResult<()> {
warn!(
"No callback is specified for interactive events: {:?}",
@ -165,7 +163,7 @@ where
async fn empty_push_events_callback(
event: SlackPushEventCallback,
_client: Arc<SlackClient<SCHC>>,
_states: Arc<RwLock<SlackClientEventsUserStateStorage>>,
_states: SlackClientEventsUserState,
) -> UserCallbackResult<()> {
warn!("No callback is specified for a push event: {:?}", event);

View file

@ -1,6 +1,6 @@
[package]
name = "slack-morphism-hyper"
version = "0.35.0"
version = "0.36.0"
authors = ["Abdulla Abdurakhmanov <me@abdolence.dev>"]
edition = "2021"
license = "Apache-2.0"
@ -18,8 +18,8 @@ name = "slack_morphism_hyper"
path = "src/lib.rs"
[dependencies]
slack-morphism = { path = "../client", version = "^0.35.0"}
slack-morphism-models = { path = "../models", version = "^0.35.0"}
slack-morphism = { path = "../client", version = "^0.36.0"}
slack-morphism-models = { path = "../models", version = "^0.36.0"}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_with = "1.13"

View file

@ -10,7 +10,7 @@ use std::sync::Arc;
async fn test_oauth_install_function(
resp: SlackOAuthV2AccessTokenResponse,
_client: Arc<SlackHyperClient>,
_states: Arc<SlackClientEventsUserState>,
_states: SlackClientEventsUserState,
) {
println!("{:#?}", resp);
}
@ -18,11 +18,11 @@ async fn test_oauth_install_function(
async fn test_push_events_function(
event: SlackPushEvent,
_client: Arc<SlackHyperClient>,
_states: Arc<SlackClientEventsUserState>,
_states: SlackClientEventsUserState,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Read state
let current_state = {
let states = _states.read().unwrap();
let states = _states.read().await;
println!("{:#?}", states.get_user_state::<UserStateExample>());
println!("{:#?}", states.len());
UserStateExample(states.get_user_state::<UserStateExample>().unwrap().0 + 1)
@ -30,7 +30,7 @@ async fn test_push_events_function(
// Write state
{
let mut states = _states.write().unwrap();
let mut states = _states.write().await;
states.set_user_state::<UserStateExample>(current_state);
println!("{:#?}", states.get_user_state::<UserStateExample>());
}
@ -42,7 +42,7 @@ async fn test_push_events_function(
async fn test_interaction_events_function(
event: SlackInteractionEvent,
_client: Arc<SlackHyperClient>,
_states: Arc<SlackClientEventsUserState>,
_states: SlackClientEventsUserState,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("{:#?}", event);
Ok(())
@ -51,7 +51,7 @@ async fn test_interaction_events_function(
async fn test_command_events_function(
event: SlackCommandEvent,
_client: Arc<SlackHyperClient>,
_states: Arc<SlackClientEventsUserState>,
_states: SlackClientEventsUserState,
) -> Result<SlackCommandEventResponse, Box<dyn std::error::Error + Send + Sync>> {
let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into();
let token: SlackApiToken = SlackApiToken::new(token_value);
@ -70,7 +70,7 @@ async fn test_command_events_function(
fn test_error_handler(
err: Box<dyn std::error::Error + Send + Sync>,
_client: Arc<SlackHyperClient>,
_states: Arc<SlackClientEventsUserState>,
_states: SlackClientEventsUserState,
) -> http::StatusCode {
println!("{:#?}", err);

View file

@ -6,7 +6,7 @@ use std::sync::Arc;
async fn test_interaction_events_function(
event: SlackInteractionEvent,
_client: Arc<SlackHyperClient>,
_states: Arc<SlackClientEventsUserState>,
_states: SlackClientEventsUserState,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("{:#?}", event);
Ok(())
@ -15,7 +15,7 @@ async fn test_interaction_events_function(
async fn test_command_events_function(
event: SlackCommandEvent,
client: Arc<SlackHyperClient>,
_states: Arc<SlackClientEventsUserState>,
_states: SlackClientEventsUserState,
) -> Result<SlackCommandEventResponse, Box<dyn std::error::Error + Send + Sync>> {
println!("{:#?}", event);
@ -37,7 +37,7 @@ async fn test_command_events_function(
async fn test_push_events_sm_function(
event: SlackPushEventCallback,
_client: Arc<SlackHyperClient>,
_states: Arc<SlackClientEventsUserState>,
_states: SlackClientEventsUserState,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("{:#?}", event);
Ok(())
@ -46,7 +46,7 @@ async fn test_push_events_sm_function(
fn test_error_handler(
err: Box<dyn std::error::Error + Send + Sync>,
_client: Arc<SlackHyperClient>,
_states: Arc<SlackClientEventsUserState>,
_states: SlackClientEventsUserState,
) -> http::StatusCode {
println!("{:#?}", err);

View file

@ -0,0 +1,93 @@
use slack_morphism::prelude::*;
use slack_morphism_hyper::*;
use std::sync::Arc;
#[derive(Debug, Clone)]
struct UserStateExample(u64);
async fn test_push_events_sm_function(
_event: SlackPushEventCallback,
_client: Arc<SlackHyperClient>,
user_state: SlackClientEventsUserState,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Read state
let current_state_example: UserStateExample = {
let storage = user_state.read().await;
storage
.get_user_state::<UserStateExample>()
.unwrap()
.clone()
};
// Write state
{
let mut storage = user_state.write().await;
let updated_state = UserStateExample(current_state_example.0 + 1);
storage.set_user_state::<UserStateExample>(updated_state);
println!(
"Updating user state from {:#?} to {:#?}",
current_state_example,
storage.get_user_state::<UserStateExample>()
);
}
Ok(())
}
fn test_error_handler(
err: Box<dyn std::error::Error + Send + Sync>,
_client: Arc<SlackHyperClient>,
_states: SlackClientEventsUserState,
) -> http::StatusCode {
println!("{:#?}", err);
// This return value should be OK if we want to return successful ack to the Slack server using Web-sockets
// https://api.slack.com/apis/connections/socket-implement#acknowledge
// so that Slack knows whether to retry
http::StatusCode::OK
}
async fn test_client_with_socket_mode() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = Arc::new(SlackClient::new(SlackClientHyperConnector::new()));
let socket_mode_callbacks =
SlackSocketModeListenerCallbacks::new().with_push_events(test_push_events_sm_function);
let listener_environment = Arc::new(
SlackClientEventsListenerEnvironment::new(client.clone())
.with_error_handler(test_error_handler)
.with_user_state(UserStateExample(0)),
);
let socket_mode_listener = SlackClientSocketModeListener::new(
&SlackClientSocketModeConfig::new(),
listener_environment.clone(),
socket_mode_callbacks,
);
let app_token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_APP_TOKEN")?.into();
let app_token: SlackApiToken = SlackApiToken::new(app_token_value);
socket_mode_listener.listen_for(&app_token).await?;
socket_mode_listener.serve().await;
Ok(())
}
pub fn config_env_var(name: &str) -> Result<String, String> {
std::env::var(name).map_err(|e| format!("{}: {}", name, e))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let subscriber = tracing_subscriber::fmt()
.with_env_filter("slack_morphism_hyper=debug,slack_morphism=debug")
.finish();
tracing::subscriber::set_global_default(subscriber)?;
test_client_with_socket_mode().await?;
Ok(())
}

View file

@ -11,7 +11,7 @@ use hyper::body::*;
use hyper::client::connect::Connect;
use hyper::{Method, Request, Response};
use std::future::Future;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use tracing::*;
impl<H: 'static + Send + Sync + Connect + Clone> SlackClientEventsHyperListener<H> {
@ -35,7 +35,7 @@ impl<H: 'static + Send + Sync + Connect + Clone> SlackClientEventsHyperListener<
req: Request<Body>,
config: &SlackOAuthListenerConfig,
client: Arc<SlackClient<SlackClientHyperConnector<H>>>,
user_state_storage: Arc<RwLock<SlackClientEventsUserStateStorage>>,
user_state_storage: SlackClientEventsUserState,
install_service_fn: UserCallbackFunction<
SlackOAuthV2AccessTokenResponse,
impl Future<Output = ()> + 'static + Send,

View file

@ -1,6 +1,6 @@
[package]
name = "slack-morphism-models"
version = "0.35.0"
version = "0.36.0"
authors = ["Abdulla Abdurakhmanov <me@abdolence.dev>"]
edition = "2021"
license = "Apache-2.0"