use std::collections::{HashMap, HashSet}; use std::{io}; use std::convert::TryFrom; use std::fs::File; use std::future::Future; use std::path::{PathBuf}; use std::io::{BufReader, BufWriter, ErrorKind, Read, Write}; use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; use futures::{StreamExt, TryStreamExt}; use kuchiki::iter::NodeIterator; use serde_json::to_writer; use tokio::net::TcpStream; use tokio_rustls::client::TlsStream; use tokio_rustls::rustls::{ClientConfig, RootCertStore}; use tokio_rustls::rustls::pki_types::ServerName; use crate::config::Config; use crate::{add_email}; use tokio::task; use tokio::time::sleep; #[cfg(not(target_os = "wasi"))] use async_imap::{Client, Session}; #[cfg(not(target_os = "wasi"))] use async_imap::extensions::idle::IdleResponse::NewData; #[cfg(target_os = "wasi")] use async_imap_wasi::{Client, Session}; #[cfg(target_os = "wasi")] use async_imap_wasi::extensions::idle::IdleResponse::NewData; /// create TLS connect with the IMAP server pub async fn connect_to_imap() -> anyhow::Result>>{ let mut root_cert_store = RootCertStore::empty(); root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); let config = ClientConfig::builder() .with_root_certificates(root_cert_store) .with_no_client_auth(); let connector = tokio_rustls::TlsConnector::from(Arc::new(config)); let dnsname = ServerName::try_from(Config::global().imap_domain.clone()).unwrap(); let stream = TcpStream::connect(format!("{}:{}", Config::global().imap_domain.clone(), Config::global().imap_port.clone())).await?; let stream = connector.connect(dnsname, stream).await?; #[cfg(not(target_os = "wasi"))] let client = async_imap::Client::new(stream); #[cfg(target_os = "wasi")] let client = async_imap_wasi::Client::new(stream); Ok(client) } /// download all emails from the server and persist them pub async fn download_email_from_imap() -> anyhow::Result>{ let mut client = connect_to_imap().await?; let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await; let mut session = match session_result { Ok(session) => {session} Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))} }; let mut stored_paths: Vec<(u32, PathBuf)> = vec![]; match list_imap_folders(&mut session).await { Ok(folders) => { for folder in folders { println!("Start downloading {}", folder.clone()); let new_paths = match fetch_and_store_emails(&mut session, folder.clone()).await { Ok(paths) => { println!("Emails fetched and stored successfully for {}.", folder); paths }, Err(err) => { eprintln!("Error during email fetching and storing: {}", err); vec![] } }; stored_paths.extend(new_paths); } } Err(_) => {return Err(anyhow!("Unable to retrieve folders from IMAP server"))} } // Logout from the IMAP session session.logout().await.expect("Unable to logout"); Ok(stored_paths) } /// return Vec with all mailboxes pub async fn list_imap_folders(session: &mut Session>) -> anyhow::Result> { let mut folders: Vec = vec![]; let mut folders_stream = session.list(None, Some("*")).await?; while let Some(folder_result) = folders_stream.next().await { match folder_result { Ok(folder) => { folders.push(folder.name().to_string()) } Err(_) => {} } } Ok(folders) } /// download all emails from one mailbox pub async fn fetch_and_store_emails(session: &mut Session>, list: String) -> anyhow::Result> { session.select(list.clone()).await?; // Create directories for maildir std::fs::create_dir_all(Config::global().maildir.clone().join(list.clone()).join("new")) .expect(&*("Unable to create 'new' directory in ".to_owned() + &*list.clone())); std::fs::create_dir_all(Config::global().maildir.clone().join(list.clone()).join("cur")) .expect(&*("Unable to create 'cur' directory in ".to_owned() + &*list.clone())); std::fs::create_dir_all(Config::global().maildir.clone().join(list.clone()).join("tmp")) .expect(&*("Unable to create 'tmp' directory in ".to_owned() + &*list.clone())); let uids_path = Config::global().maildir.clone().join(".uids.json"); let uids_server = session.uid_search("ALL").await.unwrap(); let uids_local = read_local_uids(uids_path.clone()).unwrap(); let mut stored_paths: Vec<(u32, PathBuf)> = vec![]; for uid in uids_server { if let Some(uids) = uids_local.get(&list) { if uids.contains(&uid.to_string()) { continue; } } let mut messages_stream = session.uid_fetch(uid.to_string(), "(UID BODY[])").await?; let mut message = None; while let Some(message_result) = messages_stream.next().await { match message_result { Ok(message_ok) => { message = Some(message_ok); } Err(_) => { return Err(anyhow!("Unable to retrieve message")); } } } if let Some(msg) = &message { if let Some(body) = msg.body() { let mail_file = store(Config::global().maildir.clone().join(list.clone()), uid.clone().to_string(), "new".to_string(), body, ""); match mail_file { Ok(file) => stored_paths.push((uid.to_string().parse().unwrap(), file)), Err(e) => eprintln!("Failed to store email: {}", e), } } else { eprintln!("Failed to retrieve email body"); } if let Some(uid) = msg.uid { update_local_uids(uid.to_string(), list.clone(), uids_path.clone()) .expect("Cannot write uid to .uids.json"); } println!("Downloaded message with UID {}", uid.to_string()); } else { return Err(anyhow!("No message found with the given UID")); } } Ok(stored_paths) } /// read uids that have been already downloaded fn read_local_uids(uids_path: PathBuf) -> anyhow::Result>> { let file = match File::open(&uids_path) { Ok(file) => file, Err(_) => return Ok(HashMap::new()), // Propagate other errors }; let mut reader = BufReader::new(file); let mut content = String::new(); reader.read_to_string(&mut content)?; if content.trim().is_empty() { // Return an empty HashMap if the file is empty return Ok(HashMap::new()); } // Deserialize the JSON into a temporary structure let data: HashMap> = match serde_json::from_str(&content) { Ok(data) => data, Err(e) => return Err(anyhow!("Error deserializing local uids")), // Handle JSON parsing errors }; // Convert Vec to HashSet let mut result: HashMap> = HashMap::new(); for (key, value) in data { result.insert(key, value.into_iter().collect()); } Ok(result) } /// add uid to the local list of downloaded uids fn update_local_uids(uid: String, list: String, uids_path: PathBuf) -> anyhow::Result<()> { let file = std::fs::OpenOptions::new() .write(true) .create(true) .open(uids_path.clone())?; let mut data = read_local_uids(uids_path.clone())?; data.entry(list) .or_insert_with(HashSet::new) .insert(uid); let writer = BufWriter::new(file); // Convert HashSet back to Vec for serialization let mut temp_data: HashMap> = HashMap::new(); for (key, value) in data { temp_data.insert(key.clone(), value.iter().cloned().collect()); } // Serialize the data back to JSON and write to file to_writer(writer, &temp_data)?; Ok(()) } /// delete single message remotely by uid (marks deleted and deletes from the server at the same time) pub async fn delete_email_by_uid(list: String, uid: u32) -> anyhow::Result<()>{ // TODO split to mark_deleted() and expunge() let mut client = connect_to_imap().await?; let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await; let mut session = match session_result { Ok(session) => {session} Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))} }; session.select(list.clone()).await?; let updates_stream = session.store(format!("{}", uid), "+FLAGS (\\Deleted)").await?; let _updates: Vec<_> = updates_stream.try_collect().await?; let _stream = session.expunge().await?; drop(_stream); session.logout().await?; Ok(()) } /// create a folder remotely pub async fn create_folder(name: String) -> anyhow::Result<()>{ let mut client = connect_to_imap().await?; let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await; let mut session = match session_result { Ok(session) => {session} Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))} }; session.create(name.clone()).await?; session.logout().await?; Ok(()) } /// rename a folder remotely pub async fn rename_folder(name: String, new_name: String) -> anyhow::Result<()>{ let mut client = connect_to_imap().await?; let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await; let mut session = match session_result { Ok(session) => {session} Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))} }; session.rename(name.clone(), new_name.clone()).await?; session.logout().await?; Ok(()) } /// delete a folder remotely pub async fn delete_folder(name: String) -> anyhow::Result<()>{ let mut client = connect_to_imap().await?; let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await; let mut session = match session_result { Ok(session) => {session} Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))} }; session.delete(name.clone()).await?; session.logout().await?; Ok(()) } /// run a async task to monitor any changes in the mailbox pub async fn check_for_updates(mailbox: String) -> anyhow::Result<()> { task::spawn(async move { let mut client = match connect_to_imap().await { Ok(client) => client, Err(e) => { return Err::<(), anyhow::Error>(anyhow!("Failed to connect to IMAP")); } }; let session_result = client .login(Config::global().username.clone(), Config::global().password.clone()) .await; let mut session = match session_result { Ok(session) => session, Err(_) => return Err(anyhow!("Unable to login to IMAP server")), }; session.select(mailbox.clone()).await?; loop { let mut idle = session.idle(); idle.init().await.unwrap(); let (idle_wait, interrupt) = idle.wait(); let idle_handle = task::spawn(async move { println!("IDLE: waiting for 30s"); // TODO remove debug prints sleep(Duration::from_secs(30)).await; println!("IDLE: waited 30 secs, now interrupting idle"); drop(interrupt); }); match idle_wait.await.unwrap() { // TODO add more cases like delete, move... NewData(data) => { // TODO do not update all emails (IMAP returns * {number} RECENT) and do it only for one mailbox let new_paths = download_email_from_imap().await.expect("Cannot download new emails"); for (uid, path) in new_paths.clone() { match add_email(path.clone(), uid.clone()){ Ok(_) => {} Err(_) => {println!("Error adding email from {:?}", path.clone())} }; } } reason => { println!("IDLE failed {:?}", reason); } } // Ensure the idle handle is dropped before the next loop iteration idle_handle.await.unwrap(); // Reassign session to prevent ownership issues in the next loop iteration session = idle.done().await.unwrap(); } }); Ok(()) } /// saves a raw email properly fn store(path: PathBuf, uid: String, subfolder: String, data: &[u8], info: &str) -> io::Result { // loop when conflicting filenames occur, as described at // http://www.courier-mta.org/maildir.html // this assumes that pid and hostname don't change. let mut tmppath = path.clone(); tmppath.push("tmp"); let mut file; loop { tmppath.push(format!("email_{uid}")); match std::fs::OpenOptions::new() .write(true) .create_new(true) .open(&tmppath) { Ok(f) => { file = f; break; } Err(err) => { if err.kind() != ErrorKind::AlreadyExists { return Err(err.into()); } tmppath.pop(); } } } /// At this point, `file` is our new file at `tmppath`. /// If we leave the scope of this function prior to /// successfully writing the file to its final location, /// we need to ensure that we remove the temporary file. /// This struct takes care of that detail. struct UnlinkOnError { path_to_unlink: Option, } impl Drop for UnlinkOnError { fn drop(&mut self) { if let Some(path) = self.path_to_unlink.take() { // Best effort to remove it std::fs::remove_file(path).ok(); } } } // Ensure that we remove the temporary file on failure let mut unlink_guard = UnlinkOnError { path_to_unlink: Some(tmppath.clone()), }; file.write_all(data)?; file.sync_all()?; let mut newpath = path.clone(); newpath.push(subfolder); let id = format!("email_{uid}"); newpath.push(format!("{}{}", id, info)); std::fs::rename(&tmppath, &newpath)?; unlink_guard.path_to_unlink.take(); Ok(newpath) }